ES的常规检索包含4中检索方式(SCAN和COUNT已不推荐使用)
Query And Fetch(Q_A_F)
Query Then Fetch(Q_T_F)
DFS Query And Fetch(DFS_Q_A_F)
DFS Query Then Fetch(DFS_Q_T_F)
DFS
ES使用TF-IDF统计方法,用于评估检索文档的重要程度。其思想是:
如果某个词或短语在一篇文章中出现的频率TF高,并且在其他文章中很少出现,则认为此词或者短语具有很好的类别区分能力,适合用来分类。
算法的细节可以参考:
在ES中,这个算法存在一个问题——分布式下的检索分布性。假设一个集群由5个分片,其中只有一个分片中的数据检索频率高,那么在从5个文档中分别获取打分,情况将是检索频率高的分片数据打分高,而其他的打分低。这个结果不是我们所期望的。
解决这个问题的一个简单方案就是使用全局的TF(词频)对文档进行打分,这样会使结果更偏向实际使用情况。通常默认的Query Then Fetch已经满足需要,除非检索结果与预期差别很大。
Query Then Fetch
- Send the query to each shard
- Find all matching documents and calculate scores using local Term/Document Frequencies
- Build a priority queue of results (sort, pagination with from/to, etc)
- Return metadata about the results to requesting node. Note, the actual document is not sent yet, just the scores
- Scores from all the shards are merged and sorted on the requesting node, docs are selected according to query criteria
- Finally, the actual docs are retrieved from individual shards where they reside.
- Results are returned to the client
DFS Query Then Fetch
- Prequery each shard asking about Term and Document frequencies
- Send the query to each shard
- Find all matching documents and calculate scores using global Term/Document Frequencies calculated from the prequery.
- Build a priority queue of results (sort, pagination with from/to, etc)
- Return metadata about the results to requesting node. Note, the actual document is not sent yet, just the scores
- Scores from all the shards are merged and sorted on the requesting node, docs are selected according to query criteria
- Finally, the actual docs are retrieved from individual shards where they reside.
- Results are returned to the client
官方BLOG:
中文译文:
Q_A_F与Q_T_F
这部分资料比较少,但是我们可以通过源码的阅读,从使用情况观察两者的区别。
org.elasticsearch.action.search.TransportSearchAction中可以看到,在shard数量数量为1时,ES推荐将检索类型转换为Query And Fetch。
@Override protected void doExecute(SearchRequest searchRequest, ActionListenerlistener) { // optimize search type for cases where there is only one shard group to search on if (optimizeSingleShard && searchRequest.searchType() != SCAN && searchRequest.searchType() != COUNT) { try { ClusterState clusterState = clusterService.state(); String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest); Map > routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, concreteIndices, routingMap); if (shardCount == 1) { // if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard searchRequest.searchType(QUERY_AND_FETCH); } } catch (IndexNotFoundException | IndexClosedException e) { // ignore these failures, we will notify the search response if its really the case from the actual action } catch (Exception e) { logger.debug("failed to optimize search type, continue as normal", e); } } AbstractSearchAsyncAction searchAsyncAction; switch(searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case DFS_QUERY_AND_FETCH: searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case QUERY_AND_FETCH: searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case SCAN: searchAsyncAction = new SearchScanAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case COUNT: searchAsyncAction = new SearchCountAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); } searchAsyncAction.start(); }
对应的Action中
第一阶段,Q_A_F使用了Fetch的方式进行数据获取,Q_T_F则使用了Query;
第二阶段,Q_A_F则直接进行数据merge,Q_T_F会遍历每个shard执行Fetch获取数据并merge;
结果是Q_A_F由于是一个节点,不需要分两段获取数据。Q_T_F需要先获取shard上文档的信息,处理后进行数据获取。
Q_A_F
@Override protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListenerlistener) { searchService.sendExecuteFetch(node, request, listener); } @Override protected void moveToSecondPhase() throws Exception { threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable (listener) { @Override public void doRun() throws IOException { boolean useScroll = request.scroll() != null; sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults, request); String scrollId = null; if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } @Override public void onFailure(Throwable t) { ...... } }); }
Q_T_F
@Override protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListenerlistener) { searchService.sendExecuteQuery(node, request, listener); } @Override protected void moveToSecondPhase() throws Exception { boolean useScroll = request.scroll() != null; sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); if (docIdsToLoad.asList().isEmpty()) { finishHim(); return; } final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( request, sortedShardList, firstResults.length() ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResultProvider queryResult = firstResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener () { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); fetchResults.set(shardIndex, result); if (counter.decrementAndGet() == 0) { finishHim(); } } @Override public void onFailure(Throwable t) { // the search context might not be cleared on the node where the fetch was executed for example // because the action was rejected by the thread pool. in this case we need to send a dedicated // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. docIdsToLoad.set(shardIndex, null); onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); } }); } private void finishHim() { threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable (listener) { @Override public void doRun() throws IOException { final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults, request); String scrollId = null; if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); } @Override public void onFailure(Throwable t) { try { ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures()); if (logger.isDebugEnabled()) { logger.debug("failed to reduce search", failure); } super.onFailure(failure); } finally { releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); } } }); }