博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Query And Fetch & Query Then Fetch & DFS Query And Fetch & DFS Query Then Fetch
阅读量:6278 次
发布时间:2019-06-22

本文共 10840 字,大约阅读时间需要 36 分钟。

hot3.png

ES的常规检索包含4中检索方式(SCAN和COUNT已不推荐使用)

Query And Fetch(Q_A_F)

Query Then Fetch(Q_T_F)

DFS Query And Fetch(DFS_Q_A_F)

DFS Query Then Fetch(DFS_Q_T_F)

 

DFS

    ES使用TF-IDF统计方法,用于评估检索文档的重要程度。其思想是:

如果某个词或短语在一篇文章中出现的频率TF高,并且在其他文章中很少出现,则认为此词或者短语具有很好的类别区分能力,适合用来分类。

算法的细节可以参考:

 

    在ES中,这个算法存在一个问题——分布式下的检索分布性。假设一个集群由5个分片,其中只有一个分片中的数据检索频率高,那么在从5个文档中分别获取打分,情况将是检索频率高的分片数据打分高,而其他的打分低。这个结果不是我们所期望的。

    解决这个问题的一个简单方案就是使用全局的TF(词频)对文档进行打分,这样会使结果更偏向实际使用情况。通常默认的Query Then Fetch已经满足需要,除非检索结果与预期差别很大。

Query Then Fetch

  • Send the query to each shard
  • Find all matching documents and calculate scores using local Term/Document Frequencies
  • Build a priority queue of results (sort, pagination with from/to, etc)
  • Return metadata about the results to requesting node. Note, the actual document is not sent yet, just the scores
  • Scores from all the shards are merged and sorted on the requesting node, docs are selected according to query criteria
  • Finally, the actual docs are retrieved from individual shards where they reside.
  • Results are returned to the client

DFS Query Then Fetch

  • Prequery each shard asking about Term and Document frequencies
  • Send the query to each shard
  • Find all matching documents and calculate scores using global Term/Document Frequencies calculated from the prequery.
  • Build a priority queue of results (sort, pagination with from/to, etc)
  • Return metadata about the results to requesting node. Note, the actual document is not sent yet, just the scores
  • Scores from all the shards are merged and sorted on the requesting node, docs are selected according to query criteria
  • Finally, the actual docs are retrieved from individual shards where they reside.
  • Results are returned to the client

官方BLOG:

中文译文:

 

Q_A_F与Q_T_F

    这部分资料比较少,但是我们可以通过源码的阅读,从使用情况观察两者的区别。

org.elasticsearch.action.search.TransportSearchAction中可以看到,在shard数量数量为1时,ES推荐将检索类型转换为Query And Fetch。

@Override    protected void doExecute(SearchRequest searchRequest, ActionListener
listener) { // optimize search type for cases where there is only one shard group to search on if (optimizeSingleShard && searchRequest.searchType() != SCAN && searchRequest.searchType() != COUNT) { try { ClusterState clusterState = clusterService.state(); String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest); Map
> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, concreteIndices, routingMap); if (shardCount == 1) { // if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard searchRequest.searchType(QUERY_AND_FETCH); } } catch (IndexNotFoundException | IndexClosedException e) { // ignore these failures, we will notify the search response if its really the case from the actual action } catch (Exception e) { logger.debug("failed to optimize search type, continue as normal", e); } } AbstractSearchAsyncAction searchAsyncAction; switch(searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case DFS_QUERY_AND_FETCH: searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case QUERY_AND_FETCH: searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case SCAN: searchAsyncAction = new SearchScanAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case COUNT: searchAsyncAction = new SearchCountAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); } searchAsyncAction.start(); }

 

对应的Action中

第一阶段,Q_A_F使用了Fetch的方式进行数据获取,Q_T_F则使用了Query;

第二阶段,Q_A_F则直接进行数据merge,Q_T_F会遍历每个shard执行Fetch获取数据并merge;

结果是Q_A_F由于是一个节点,不需要分两段获取数据。Q_T_F需要先获取shard上文档的信息,处理后进行数据获取。

Q_A_F

@Override    protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,                                         ActionListener
listener) { searchService.sendExecuteFetch(node, request, listener); } @Override protected void moveToSecondPhase() throws Exception { threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable
(listener) { @Override public void doRun() throws IOException { boolean useScroll = request.scroll() != null; sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults, request); String scrollId = null; if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } @Override public void onFailure(Throwable t) { ...... } }); }

 

Q_T_F

@Override    protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,                                         ActionListener
listener) { searchService.sendExecuteQuery(node, request, listener); } @Override protected void moveToSecondPhase() throws Exception { boolean useScroll = request.scroll() != null; sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); if (docIdsToLoad.asList().isEmpty()) { finishHim(); return; } final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( request, sortedShardList, firstResults.length() ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (AtomicArray.Entry
entry : docIdsToLoad.asList()) { QuerySearchResultProvider queryResult = firstResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener
() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); fetchResults.set(shardIndex, result); if (counter.decrementAndGet() == 0) { finishHim(); } } @Override public void onFailure(Throwable t) { // the search context might not be cleared on the node where the fetch was executed for example // because the action was rejected by the thread pool. in this case we need to send a dedicated // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. docIdsToLoad.set(shardIndex, null); onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); } }); } private void finishHim() { threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable
(listener) { @Override public void doRun() throws IOException { final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults, request); String scrollId = null; if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); } @Override public void onFailure(Throwable t) { try { ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures()); if (logger.isDebugEnabled()) { logger.debug("failed to reduce search", failure); } super.onFailure(failure); } finally { releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); } } }); }

 

转载于:https://my.oschina.net/SEyanlei/blog/967590

你可能感兴趣的文章
计算机网络与Internet应用
查看>>
Django 文件下载功能
查看>>
走红日本 阿里云如何能够赢得海外荣耀
查看>>
磁盘空间满引起的mysql启动失败:ERROR! MySQL server PID file could not be found!
查看>>
点播转码相关常见问题及排查方式
查看>>
[arm驱动]linux设备地址映射到用户空间
查看>>
弗洛伊德算法
查看>>
【算法之美】求解两个有序数组的中位数 — leetcode 4. Median of Two Sorted Arrays
查看>>
精度 Precision
查看>>
Android——4.2 - 3G移植之路之 APN (五)
查看>>
Linux_DHCP服务搭建
查看>>
[SilverLight]DataGrid实现批量输入(like Excel)(补充)
查看>>
秋式广告杀手:广告拦截原理与杀手组织
查看>>
翻译 | 摆脱浏览器限制的JavaScript
查看>>
闲扯下午引爆乌云社区“盗窃”乌云币事件
查看>>
02@在类的头文件中尽量少引入其他头文件
查看>>
JAVA IO BIO NIO AIO
查看>>
input checkbox 复选框大小修改
查看>>
网吧维护工具
查看>>
BOOT.INI文件参数
查看>>