dsmiley commented on code in PR #3418: URL: https://github.com/apache/solr/pull/3418#discussion_r2312543674
########## solr/core/src/java/org/apache/solr/handler/component/CombinedQueryComponent.java: ########## @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CombinerParams; +import org.apache.solr.common.params.CursorMarkParams; +import org.apache.solr.common.params.GroupParams; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; +import org.apache.solr.response.BasicResultContext; +import org.apache.solr.response.ResultContext; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.DocListAndSet; +import org.apache.solr.search.QueryResult; +import org.apache.solr.search.SolrReturnFields; +import org.apache.solr.search.SortSpec; +import org.apache.solr.search.combine.QueryAndResponseCombiner; +import org.apache.solr.search.combine.ReciprocalRankFusion; +import org.apache.solr.util.SolrResponseUtil; +import org.apache.solr.util.plugin.SolrCoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The CombinedQueryComponent class extends QueryComponent and provides support for executing + * multiple queries and combining their results. + */ +public class CombinedQueryComponent extends QueryComponent implements SolrCoreAware { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String COMPONENT_NAME = "combined_query"; + protected NamedList<?> initParams; + private final Map<String, QueryAndResponseCombiner> combiners = new HashMap<>(); + private int maxCombinerQueries; + + @Override + public void init(NamedList<?> args) { + super.init(args); + this.initParams = args; + this.maxCombinerQueries = CombinerParams.DEFAULT_MAX_COMBINER_QUERIES; + } + + @Override + public void inform(SolrCore core) { + if (initParams != null && initParams.size() > 0) { + for (Map.Entry<String, ?> initEntry : initParams) { + if ("combiners".equals(initEntry.getKey()) + && initEntry.getValue() instanceof NamedList<?> all) { + for (int i = 0; i < all.size(); i++) { + String name = all.getName(i); + NamedList<?> combinerConfig = (NamedList<?>) all.getVal(i); + String className = (String) combinerConfig.get("class"); + QueryAndResponseCombiner combiner = + core.getResourceLoader().newInstance(className, QueryAndResponseCombiner.class); + combiner.init(combinerConfig); + combiners.compute( + name, + (k, existingCombiner) -> { + if (existingCombiner == null) { + return combiner; + } + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Found more than one combiner with same name"); + }); + } + } + } + Object maxQueries = initParams.get("maxCombinerQueries"); + if (maxQueries != null) { + this.maxCombinerQueries = Integer.parseInt(maxQueries.toString()); + } + } + combiners.computeIfAbsent( + CombinerParams.RECIPROCAL_RANK_FUSION, + key -> { + ReciprocalRankFusion reciprocalRankFusion = new ReciprocalRankFusion(); + reciprocalRankFusion.init(initParams); + return reciprocalRankFusion; + }); + } + + /** + * Overrides the prepare method to handle combined queries. + * + * @param rb the ResponseBuilder to prepare + * @throws IOException if an I/O error occurs during preparation + */ + @Override + public void prepare(ResponseBuilder rb) throws IOException { + if (rb instanceof CombinedQueryResponseBuilder crb) { + SolrParams params = crb.req.getParams(); + if (params.get(CursorMarkParams.CURSOR_MARK_PARAM) != null + || params.getBool(GroupParams.GROUP, false)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "Unsupported functionality for Combined Queries."); + } + String[] queriesToCombineKeys = params.getParams(CombinerParams.COMBINER_QUERY); + if (queriesToCombineKeys.length > maxCombinerQueries) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Too many queries to combine: limit is " + maxCombinerQueries); + } + for (String queryKey : queriesToCombineKeys) { + final var unparsedQuery = params.get(queryKey); + ResponseBuilder rbNew = new ResponseBuilder(rb.req, new SolrQueryResponse(), rb.components); + rbNew.setQueryString(unparsedQuery); + super.prepare(rbNew); + crb.responseBuilders.add(rbNew); + } + } + super.prepare(rb); + } + + /** + * Overrides the process method to handle CombinedQueryResponseBuilder instances. This method + * processes the responses from multiple shards, combines them using the specified + * QueryAndResponseCombiner strategy, and sets the appropriate results and metadata in the + * CombinedQueryResponseBuilder. + * + * @param rb the ResponseBuilder object to process + * @throws IOException if an I/O error occurs during processing + */ + @Override + public void process(ResponseBuilder rb) throws IOException { + if (rb instanceof CombinedQueryResponseBuilder crb) { + boolean partialResults = false; + boolean segmentTerminatedEarly = false; + boolean setMaxHitsTerminatedEarly = false; + List<QueryResult> queryResults = new ArrayList<>(); + for (ResponseBuilder rbNow : crb.responseBuilders) { Review Comment: why the name "rbNow"? The "rb" part is obvious but not "Now". ########## solr/solr-ref-guide/modules/query-guide/pages/json-combined-query-dsl.adoc: ########## @@ -0,0 +1,113 @@ += JSON Combined Query DSL +:tabs-sync-option: +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +The Combined Query feature aims to execute multiple queries of multiple kinds across multiple shards of a collection and combine their result basis an algorithm (like Reciprocal Rank Fusion). +It is extending JSON Query DSL ultimately enabling Hybrid Search. + +[NOTE] +==== +This feature is currently unsupported for grouping and Cursors. In User-Managed (aka Standalone) Mode, the `shards` parameter must be provided to enable this feature. +==== + +== Query DSL Structure +The query structure is similar to JSON Query DSL except for how multiple queries are defined along with their parameters. + +* Multiple queries can be defined under the `queries` key by providing their name with the same syntax as a single query is defined with the key `query`. +* In addition to the other supported parameters, there are several parameters which can be defined under `params` key as below: +`combiner` | Default: `false`:: + Enables the combined query mode when set to `true`. +`combiner.query`:: + The list of queries to be executed as defined in the `queries` key. Example: `["query1", "query2"]` +`combiner.algorithm` | Default: `rrf`:: + The algorithm to be used for combining the results. Reciprocal Rank Fusion (RRF) is the in-built fusion algorithm. + Any other algorithm can be configured using xref:json-combined-query-dsl.adoc#combiner-algorithm-plugin[plugin]. +`combiner.rrf.k` | Default: `60`:: + The k parameter in the RRF algorithm. + +=== Example + +Below is a sample JSON query payload: + +``` +{ + "queries": { + "lexical1": { + "lucene": { + "query": "title:sales" + } + }, + "lexical2": { + "lucene": { + "query": "title:report" + } + }, + "vector": { + "knn": { + "f": "vector", + "topK" :5, + "query": "[0.1,-0.34,0.89,0.02]" + } + } + }, + "limit": 5, + "fields": ["id", "score", "title"], + "params": { + "combiner": true, + "combiner.query": ["lexical1", "vector"], + "combiner.algorithm": "rrf", + "combiner.rrf.k": "15" + } +} +``` + +== Search Handler Configuration + +Combined Query Feature has a separate handler with class `solr.CombinedQuerySearchHandler` which can be configured as below: + +``` +<requestHandler name="/search" class="solr.CombinedQuerySearchHandler"> +<bool name="httpCaching">true</bool> Review Comment: why specify httpCaching here? It's not unique to CombinedQuerySearchHandler ########## solr/core/src/java/org/apache/solr/handler/component/CombinedQueryComponent.java: ########## @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CombinerParams; +import org.apache.solr.common.params.CursorMarkParams; +import org.apache.solr.common.params.GroupParams; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; +import org.apache.solr.response.BasicResultContext; +import org.apache.solr.response.ResultContext; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.DocListAndSet; +import org.apache.solr.search.QueryResult; +import org.apache.solr.search.SolrReturnFields; +import org.apache.solr.search.SortSpec; +import org.apache.solr.search.combine.QueryAndResponseCombiner; +import org.apache.solr.search.combine.ReciprocalRankFusion; +import org.apache.solr.util.SolrResponseUtil; +import org.apache.solr.util.plugin.SolrCoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The CombinedQueryComponent class extends QueryComponent and provides support for executing + * multiple queries and combining their results. + */ +public class CombinedQueryComponent extends QueryComponent implements SolrCoreAware { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String COMPONENT_NAME = "combined_query"; + protected NamedList<?> initParams; + private final Map<String, QueryAndResponseCombiner> combiners = new HashMap<>(); + private int maxCombinerQueries; + + @Override + public void init(NamedList<?> args) { + super.init(args); + this.initParams = args; + this.maxCombinerQueries = CombinerParams.DEFAULT_MAX_COMBINER_QUERIES; + } + + @Override + public void inform(SolrCore core) { + if (initParams != null && initParams.size() > 0) { + for (Map.Entry<String, ?> initEntry : initParams) { + if ("combiners".equals(initEntry.getKey()) + && initEntry.getValue() instanceof NamedList<?> all) { + for (int i = 0; i < all.size(); i++) { + String name = all.getName(i); + NamedList<?> combinerConfig = (NamedList<?>) all.getVal(i); + String className = (String) combinerConfig.get("class"); + QueryAndResponseCombiner combiner = + core.getResourceLoader().newInstance(className, QueryAndResponseCombiner.class); + combiner.init(combinerConfig); + combiners.compute( + name, + (k, existingCombiner) -> { + if (existingCombiner == null) { + return combiner; + } + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Found more than one combiner with same name"); + }); + } + } + } + Object maxQueries = initParams.get("maxCombinerQueries"); + if (maxQueries != null) { + this.maxCombinerQueries = Integer.parseInt(maxQueries.toString()); + } + } + combiners.computeIfAbsent( + CombinerParams.RECIPROCAL_RANK_FUSION, + key -> { + ReciprocalRankFusion reciprocalRankFusion = new ReciprocalRankFusion(); + reciprocalRankFusion.init(initParams); + return reciprocalRankFusion; + }); + } + + /** + * Overrides the prepare method to handle combined queries. + * + * @param rb the ResponseBuilder to prepare + * @throws IOException if an I/O error occurs during preparation + */ + @Override + public void prepare(ResponseBuilder rb) throws IOException { + if (rb instanceof CombinedQueryResponseBuilder crb) { + SolrParams params = crb.req.getParams(); + if (params.get(CursorMarkParams.CURSOR_MARK_PARAM) != null + || params.getBool(GroupParams.GROUP, false)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "Unsupported functionality for Combined Queries."); + } + String[] queriesToCombineKeys = params.getParams(CombinerParams.COMBINER_QUERY); + if (queriesToCombineKeys.length > maxCombinerQueries) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Too many queries to combine: limit is " + maxCombinerQueries); + } + for (String queryKey : queriesToCombineKeys) { + final var unparsedQuery = params.get(queryKey); + ResponseBuilder rbNew = new ResponseBuilder(rb.req, new SolrQueryResponse(), rb.components); + rbNew.setQueryString(unparsedQuery); + super.prepare(rbNew); + crb.responseBuilders.add(rbNew); + } + } + super.prepare(rb); + } + + /** + * Overrides the process method to handle CombinedQueryResponseBuilder instances. This method + * processes the responses from multiple shards, combines them using the specified + * QueryAndResponseCombiner strategy, and sets the appropriate results and metadata in the + * CombinedQueryResponseBuilder. + * + * @param rb the ResponseBuilder object to process + * @throws IOException if an I/O error occurs during processing + */ + @Override + public void process(ResponseBuilder rb) throws IOException { + if (rb instanceof CombinedQueryResponseBuilder crb) { Review Comment: it's a shame that this long method is indented 1 level just because of this check (contributes to cyclomatic complexity). Maybe a extract-method refactoring could best resolve that... ########## solr/core/src/test/org/apache/solr/handler/component/DistributedCombinedQueryComponentTest.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.solr.BaseDistributedSearchTestCase; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CommonParams; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * The DistributedCombinedQueryComponentTest class is a JUnit test suite that evaluates the + * functionality of the CombinedQueryComponent in a Solr distributed search environment. It focuses + * on testing the integration of lexical and vector queries using the combiner component. + */ +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class DistributedCombinedQueryComponentTest extends BaseDistributedSearchTestCase { + + private static final int NUM_DOCS = 10; Review Comment: Only 10 docs... this isn't very interesting. Ideally this distributed test shows the effects of a rows=K (something small) where the query matched more than K, and thus test that we get back the top-K out of more possible matches. This will influence faceting as well since faceting operates on the entire numDocs (DocSet) (not limited by k, AKA rows). ########## solr/solr-ref-guide/modules/query-guide/pages/json-combined-query-dsl.adoc: ########## @@ -0,0 +1,113 @@ += JSON Combined Query DSL +:tabs-sync-option: +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +The Combined Query feature aims to execute multiple queries of multiple kinds across multiple shards of a collection and combine their result basis an algorithm (like Reciprocal Rank Fusion). +It is extending JSON Query DSL ultimately enabling Hybrid Search. Review Comment: This statement implies that it can only be used with JSON Query DSL. Is this true? If not; don't make any reference to it here (it's still okay to show examples as you have done). If it is, can you please refer me to a line of code somewhere that couples them? I don't see it. ########## solr/core/src/java/org/apache/solr/search/combine/ReciprocalRankFusion.java: ########## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.search.combine; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import org.apache.lucene.document.Document; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TotalHits; +import org.apache.solr.common.params.CombinerParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.handler.component.ShardDoc; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.search.DocIterator; +import org.apache.solr.search.DocList; +import org.apache.solr.search.DocSlice; +import org.apache.solr.search.QueryResult; +import org.apache.solr.search.SolrDocumentFetcher; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.search.SortedIntDocSet; + +/** + * This class implements a query and response combiner that uses the Reciprocal Rank Fusion (RRF) + * algorithm to combine multiple ranked lists into a single ranked list. + */ +public class ReciprocalRankFusion extends QueryAndResponseCombiner { + + private int k; + + public ReciprocalRankFusion() { + this.k = CombinerParams.DEFAULT_COMBINER_RRF_K; + } + + @Override + public void init(NamedList<?> args) { + Object kParam = args.get("k"); + if (kParam != null) { + this.k = Integer.parseInt(kParam.toString()); + } + } + + public int getK() { + return k; + } + + @Override + public QueryResult combine(List<QueryResult> rankedLists, SolrParams solrParams) { + int kVal = solrParams.getInt(CombinerParams.COMBINER_RRF_K, this.k); + List<DocList> docLists = getDocListsFromQueryResults(rankedLists); + QueryResult combinedResult = new QueryResult(); + combineResults(combinedResult, docLists, false, kVal); Review Comment: I see that only the DocList from each input QueryResult is being considered. The DocSet is ignored, and we instead pretend that the final/result DocSet is only those top documents as found in the resulting DocList. Consequently, DocSet consuming components (like faceting) will return a small subset of the total results/counts that they _should_ see. If the query matches very little (as in your test), this won't be noticed. ########## solr/core/src/java/org/apache/solr/handler/component/CombinedQueryComponent.java: ########## @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CombinerParams; +import org.apache.solr.common.params.CursorMarkParams; +import org.apache.solr.common.params.GroupParams; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; +import org.apache.solr.response.BasicResultContext; +import org.apache.solr.response.ResultContext; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.DocListAndSet; +import org.apache.solr.search.QueryResult; +import org.apache.solr.search.SolrReturnFields; +import org.apache.solr.search.SortSpec; +import org.apache.solr.search.combine.QueryAndResponseCombiner; +import org.apache.solr.search.combine.ReciprocalRankFusion; +import org.apache.solr.util.SolrResponseUtil; +import org.apache.solr.util.plugin.SolrCoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The CombinedQueryComponent class extends QueryComponent and provides support for executing + * multiple queries and combining their results. + */ +public class CombinedQueryComponent extends QueryComponent implements SolrCoreAware { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String COMPONENT_NAME = "combined_query"; + protected NamedList<?> initParams; + private final Map<String, QueryAndResponseCombiner> combiners = new HashMap<>(); + private int maxCombinerQueries; + + @Override + public void init(NamedList<?> args) { + super.init(args); + this.initParams = args; + this.maxCombinerQueries = CombinerParams.DEFAULT_MAX_COMBINER_QUERIES; + } + + @Override + public void inform(SolrCore core) { + if (initParams != null && initParams.size() > 0) { Review Comment: honestly you can skip this useless check since init() *will* be called first. We must rely on Solr to do what it's supposed to do. ########## solr/core/src/test/org/apache/solr/handler/component/DistributedCombinedQueryComponentTest.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.solr.BaseDistributedSearchTestCase; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CommonParams; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * The DistributedCombinedQueryComponentTest class is a JUnit test suite that evaluates the + * functionality of the CombinedQueryComponent in a Solr distributed search environment. It focuses + * on testing the integration of lexical and vector queries using the combiner component. + */ +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class DistributedCombinedQueryComponentTest extends BaseDistributedSearchTestCase { + + private static final int NUM_DOCS = 10; + private static final String vectorField = "vector"; + + /** + * Sets up the test class by initializing the core and setting system properties. This method is + * executed before all test methods in the class. + * + * @throws Exception if any exception occurs during initialization + */ + @BeforeClass + public static void setUpClass() throws Exception { + initCore("solrconfig-combined-query.xml", "schema-vector-catchall.xml"); + System.setProperty("validateAfterInactivity", "200"); + System.setProperty("solr.httpclient.retries", "0"); + System.setProperty("distribUpdateSoTimeout", "5000"); + } + + /** + * Prepares Solr input documents for indexing, including adding sample data and vector fields. + * This method populates the Solr index with test data, including text, title, and vector fields. + * The vector fields are used to calculate cosine distance for testing purposes. + * + * @throws Exception if any error occurs during the indexing process. + */ + private synchronized void prepareIndexDocs() throws Exception { + List<SolrInputDocument> docs = new ArrayList<>(); + fixShardCount(2); + for (int i = 1; i <= NUM_DOCS; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", Integer.toString(i)); + doc.addField("text", "test text for doc " + i); + doc.addField("title", "title test for doc " + i); + doc.addField("nullfirst", String.valueOf(i % 3)); Review Comment: that's an odd/confusing choice of fields to use. Maybe use "mod3_idv" ########## solr/core/src/java/org/apache/solr/handler/component/CombinedQueryComponent.java: ########## @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CombinerParams; +import org.apache.solr.common.params.CursorMarkParams; +import org.apache.solr.common.params.GroupParams; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; +import org.apache.solr.response.BasicResultContext; +import org.apache.solr.response.ResultContext; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.DocListAndSet; +import org.apache.solr.search.QueryResult; +import org.apache.solr.search.SolrReturnFields; +import org.apache.solr.search.SortSpec; +import org.apache.solr.search.combine.QueryAndResponseCombiner; +import org.apache.solr.search.combine.ReciprocalRankFusion; +import org.apache.solr.util.SolrResponseUtil; +import org.apache.solr.util.plugin.SolrCoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The CombinedQueryComponent class extends QueryComponent and provides support for executing + * multiple queries and combining their results. + */ +public class CombinedQueryComponent extends QueryComponent implements SolrCoreAware { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String COMPONENT_NAME = "combined_query"; + protected NamedList<?> initParams; + private final Map<String, QueryAndResponseCombiner> combiners = new HashMap<>(); + private int maxCombinerQueries; + + @Override + public void init(NamedList<?> args) { + super.init(args); + this.initParams = args; + this.maxCombinerQueries = CombinerParams.DEFAULT_MAX_COMBINER_QUERIES; + } + + @Override + public void inform(SolrCore core) { + if (initParams != null && initParams.size() > 0) { + for (Map.Entry<String, ?> initEntry : initParams) { + if ("combiners".equals(initEntry.getKey()) + && initEntry.getValue() instanceof NamedList<?> all) { + for (int i = 0; i < all.size(); i++) { + String name = all.getName(i); + NamedList<?> combinerConfig = (NamedList<?>) all.getVal(i); + String className = (String) combinerConfig.get("class"); + QueryAndResponseCombiner combiner = + core.getResourceLoader().newInstance(className, QueryAndResponseCombiner.class); + combiner.init(combinerConfig); + combiners.compute( + name, + (k, existingCombiner) -> { + if (existingCombiner == null) { + return combiner; + } + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Found more than one combiner with same name"); + }); + } + } + } + Object maxQueries = initParams.get("maxCombinerQueries"); + if (maxQueries != null) { + this.maxCombinerQueries = Integer.parseInt(maxQueries.toString()); + } + } + combiners.computeIfAbsent( + CombinerParams.RECIPROCAL_RANK_FUSION, + key -> { + ReciprocalRankFusion reciprocalRankFusion = new ReciprocalRankFusion(); + reciprocalRankFusion.init(initParams); + return reciprocalRankFusion; + }); + } + + /** + * Overrides the prepare method to handle combined queries. + * + * @param rb the ResponseBuilder to prepare + * @throws IOException if an I/O error occurs during preparation + */ + @Override + public void prepare(ResponseBuilder rb) throws IOException { + if (rb instanceof CombinedQueryResponseBuilder crb) { + SolrParams params = crb.req.getParams(); + if (params.get(CursorMarkParams.CURSOR_MARK_PARAM) != null + || params.getBool(GroupParams.GROUP, false)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "Unsupported functionality for Combined Queries."); + } + String[] queriesToCombineKeys = params.getParams(CombinerParams.COMBINER_QUERY); + if (queriesToCombineKeys.length > maxCombinerQueries) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Too many queries to combine: limit is " + maxCombinerQueries); + } + for (String queryKey : queriesToCombineKeys) { + final var unparsedQuery = params.get(queryKey); + ResponseBuilder rbNew = new ResponseBuilder(rb.req, new SolrQueryResponse(), rb.components); + rbNew.setQueryString(unparsedQuery); + super.prepare(rbNew); + crb.responseBuilders.add(rbNew); + } + } + super.prepare(rb); + } + + /** + * Overrides the process method to handle CombinedQueryResponseBuilder instances. This method + * processes the responses from multiple shards, combines them using the specified Review Comment: That is quite wrong. process() is local to the current index; it does *not* process responses from shards. We can say that we'll execute all of the queries provided and combine them. I would not speak of "shards" in this description. ########## solr/core/src/java/org/apache/solr/search/combine/ReciprocalRankFusion.java: ########## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.search.combine; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import org.apache.lucene.document.Document; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TotalHits; +import org.apache.solr.common.params.CombinerParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.handler.component.ShardDoc; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.search.DocIterator; +import org.apache.solr.search.DocList; +import org.apache.solr.search.DocSlice; +import org.apache.solr.search.QueryResult; +import org.apache.solr.search.SolrDocumentFetcher; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.search.SortedIntDocSet; + +/** + * The ReciprocalRankFusion class implements a query and response combiner that uses the Reciprocal + * Rank Fusion (RRF) algorithm to combine multiple ranked lists into a single ranked list. + */ +public class ReciprocalRankFusion extends QueryAndResponseCombiner { + + private int k; + + public int getK() { + return k; + } + + public ReciprocalRankFusion() { + this.k = CombinerParams.COMBINER_RRF_K_DEFAULT; + } + + @Override + public void init(NamedList<?> args) { + Object kParam = args.get("k"); + if (kParam != null) { + this.k = Integer.parseInt(kParam.toString()); + } + } + + @Override + public QueryResult combine(List<QueryResult> rankedLists, SolrParams solrParams) { + int kVal = solrParams.getInt(CombinerParams.COMBINER_RRF_K, this.k); + List<DocList> docLists = getDocListsFromQueryResults(rankedLists); + QueryResult combinedResult = new QueryResult(); + combineResults(combinedResult, docLists, false, kVal); + return combinedResult; + } + + private static List<DocList> getDocListsFromQueryResults(List<QueryResult> rankedLists) { + List<DocList> docLists = new ArrayList<>(rankedLists.size()); + for (QueryResult rankedList : rankedLists) { + docLists.add(rankedList.getDocList()); + } + return docLists; + } + + @Override + public List<ShardDoc> combine(Map<String, List<ShardDoc>> shardDocMap, SolrParams solrParams) { + int kVal = solrParams.getInt(CombinerParams.COMBINER_RRF_K, this.k); + HashMap<String, Float> docIdToScore = new HashMap<>(); + Map<String, ShardDoc> docIdToShardDoc = new HashMap<>(); + List<ShardDoc> finalShardDocList = new ArrayList<>(); + for (Map.Entry<String, List<ShardDoc>> shardDocEntry : shardDocMap.entrySet()) { + List<ShardDoc> shardDocList = shardDocEntry.getValue(); + int ranking = 1; + while (ranking <= shardDocList.size()) { + String docId = shardDocList.get(ranking - 1).id.toString(); + docIdToShardDoc.put(docId, shardDocList.get(ranking - 1)); + float rrfScore = 1f / (kVal + ranking); + docIdToScore.compute(docId, (id, score) -> (score == null) ? rrfScore : score + rrfScore); + ranking++; + } + } + List<Map.Entry<String, Float>> sortedByScoreDescending = + docIdToScore.entrySet().stream() + .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) + .toList(); + for (Map.Entry<String, Float> scoredDoc : sortedByScoreDescending) { + String docId = scoredDoc.getKey(); + Float score = scoredDoc.getValue(); + ShardDoc shardDoc = docIdToShardDoc.get(docId); + shardDoc.score = score; + finalShardDocList.add(shardDoc); + } + return finalShardDocList; + } + + private Map<Integer, Integer[]> combineResults( + QueryResult combinedRankedList, + List<DocList> rankedLists, + boolean saveRankPositionsForExplain, + int kVal) { + Map<Integer, Integer[]> docIdToRanks = null; + HashMap<Integer, Float> docIdToScore = new HashMap<>(); + long totalMatches = 0; + for (DocList rankedList : rankedLists) { + DocIterator docs = rankedList.iterator(); + totalMatches = Math.max(totalMatches, rankedList.matches()); + int ranking = 1; + while (docs.hasNext()) { + int docId = docs.nextDoc(); + float rrfScore = 1f / (kVal + ranking); + docIdToScore.compute(docId, (id, score) -> (score == null) ? rrfScore : score + rrfScore); + ranking++; + } + } + List<Map.Entry<Integer, Float>> sortedByScoreDescending = + docIdToScore.entrySet().stream() + .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) + .toList(); + + int combinedResultsLength = docIdToScore.size(); + int[] combinedResultsDocIds = new int[combinedResultsLength]; + float[] combinedResultScores = new float[combinedResultsLength]; + + int i = 0; + for (Map.Entry<Integer, Float> scoredDoc : sortedByScoreDescending) { + combinedResultsDocIds[i] = scoredDoc.getKey(); + combinedResultScores[i] = scoredDoc.getValue(); + i++; + } + + if (saveRankPositionsForExplain) { + docIdToRanks = getRanks(rankedLists, combinedResultsDocIds); + } + + DocSlice combinedResultSlice = + new DocSlice( + 0, + combinedResultsLength, + combinedResultsDocIds, + combinedResultScores, + Math.max(combinedResultsLength, totalMatches), + combinedResultScores.length > 0 ? combinedResultScores[0] : 0, + TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO); + combinedRankedList.setDocList(combinedResultSlice); + SortedIntDocSet docSet = new SortedIntDocSet(combinedResultsDocIds, combinedResultsLength); Review Comment: That was a helpful response; thanks. But I see one *big* (yet easily fixed) issue... exactly what the final DocSet should be: I see that only the DocList from each input QueryResult is being considered. The DocSet of each QueryResult is ignored, and we instead pretend that the final/result DocSet is only those top documents as found in the resulting DocList. Consequently, DocSet consuming components (like faceting) will return a small subset of the total results/counts that they should see. If the query matches very little (as in your test), this won't be noticed. A fix would involve changing the method signature of this method to take the QueryResult (not just the DocList) of each query. And then to take the union of the DocSets of each using the utility methods on DocSets to do such. I'm assuming here that RRF doesn't filter out docs, it only ranks them. ########## solr/core/src/test/org/apache/solr/search/combine/ReciprocalRankFusionTest.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.search.combine; + +import static org.apache.solr.common.params.CombinerParams.COMBINER_RRF_K; +import static org.apache.solr.common.params.CombinerParams.RECIPROCAL_RANK_FUSION; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.search.TotalHits; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CombinerParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.handler.component.ShardDoc; +import org.apache.solr.search.DocSlice; +import org.apache.solr.search.QueryResult; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * The ReciprocalRankFusionTest class is a unit test suite for the {@link ReciprocalRankFusion} + * class. It verifies the correctness of the fusion algorithm and its supporting methods. + */ +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class ReciprocalRankFusionTest extends SolrTestCaseJ4 { Review Comment: Let's test DocSet combining ;-) ########## solr/core/src/java/org/apache/solr/handler/component/CombinedQueryComponent.java: ########## @@ -0,0 +1,572 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CombinerParams; +import org.apache.solr.common.params.CursorMarkParams; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; +import org.apache.solr.response.BasicResultContext; +import org.apache.solr.response.ResultContext; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.DocListAndSet; +import org.apache.solr.search.QueryResult; +import org.apache.solr.search.SolrReturnFields; +import org.apache.solr.search.SortSpec; +import org.apache.solr.search.combine.QueryAndResponseCombiner; +import org.apache.solr.search.combine.ReciprocalRankFusion; +import org.apache.solr.util.SolrResponseUtil; +import org.apache.solr.util.plugin.SolrCoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The CombinedQueryComponent class extends QueryComponent and provides support for executing + * multiple queries and combining their results. + */ +public class CombinedQueryComponent extends QueryComponent implements SolrCoreAware { + + public static final String COMPONENT_NAME = "combined_query"; + protected NamedList<?> initParams; + private Map<String, QueryAndResponseCombiner> combiners = new ConcurrentHashMap<>(); + private int maxCombinerQueries; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + public void init(NamedList<?> args) { + super.init(args); + this.initParams = args; + this.maxCombinerQueries = CombinerParams.DEFAULT_MAX_COMBINER_QUERIES; + } + + @Override + public void inform(SolrCore core) { + if (initParams != null && initParams.size() > 0) { + log.info("Initializing CombinedQueryComponent"); + NamedList<?> all = (NamedList<?>) initParams.get("combiners"); + for (int i = 0; i < all.size(); i++) { + String name = all.getName(i); + NamedList<?> combinerConfig = (NamedList<?>) all.getVal(i); + String className = (String) combinerConfig.get("class"); + QueryAndResponseCombiner combiner = + core.getResourceLoader().newInstance(className, QueryAndResponseCombiner.class); + combiner.init(combinerConfig); + combiners.computeIfAbsent(name, combinerName -> combiner); + } + Object maxQueries = initParams.get("maxCombinerQueries"); + if (maxQueries != null) { + this.maxCombinerQueries = Integer.parseInt(maxQueries.toString()); + } + } + combiners.computeIfAbsent( + CombinerParams.RECIPROCAL_RANK_FUSION, + key -> { + ReciprocalRankFusion reciprocalRankFusion = new ReciprocalRankFusion(); + reciprocalRankFusion.init(initParams); + return reciprocalRankFusion; + }); + } + + /** + * Overrides the prepare method to handle combined queries. + * + * @param rb the ResponseBuilder to prepare + * @throws IOException if an I/O error occurs during preparation + */ + @Override + public void prepare(ResponseBuilder rb) throws IOException { + if (rb instanceof CombinedQueryResponseBuilder crb) { + SolrParams params = crb.req.getParams(); + String[] queriesToCombineKeys = params.getParams(CombinerParams.COMBINER_QUERY); + if (queriesToCombineKeys.length > maxCombinerQueries) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Too many queries to combine: limit is " + maxCombinerQueries); + } + for (String queryKey : queriesToCombineKeys) { + final var unparsedQuery = params.get(queryKey); + ResponseBuilder rbNew = new ResponseBuilder(rb.req, new SolrQueryResponse(), rb.components); + rbNew.setQueryString(unparsedQuery); + super.prepare(rbNew); + crb.responseBuilders.add(rbNew); + } + } + super.prepare(rb); + } + + /** + * Overrides the process method to handle CombinedQueryResponseBuilder instances. This method + * processes the responses from multiple shards, combines them using the specified + * QueryAndResponseCombiner strategy, and sets the appropriate results and metadata in the + * CombinedQueryResponseBuilder. + * + * @param rb the ResponseBuilder object to process + * @throws IOException if an I/O error occurs during processing + */ + @Override + public void process(ResponseBuilder rb) throws IOException { + if (rb instanceof CombinedQueryResponseBuilder crb) { + boolean partialResults = false; + boolean segmentTerminatedEarly = false; + boolean setMaxHitsTerminatedEarly = false; + List<QueryResult> queryResults = new ArrayList<>(); + for (ResponseBuilder rbNow : crb.responseBuilders) { + // propagating from global ResponseBuilder, so that if in case cursor is needed for + // retrieving the next batch of documents + // which might have duplicate results from the previous cursorMark as we are dealing with + // multiple queries + rbNow.setCursorMark(crb.getCursorMark()); + super.process(rbNow); + DocListAndSet docListAndSet = rbNow.getResults(); + QueryResult queryResult = new QueryResult(); + queryResult.setDocListAndSet(docListAndSet); + queryResults.add(queryResult); + partialResults |= queryResult.isPartialResults(); + if (queryResult.getSegmentTerminatedEarly() != null) { + segmentTerminatedEarly |= queryResult.getSegmentTerminatedEarly(); + } + if (queryResult.getMaxHitsTerminatedEarly() != null) { + setMaxHitsTerminatedEarly |= queryResult.getMaxHitsTerminatedEarly(); + } + } + QueryAndResponseCombiner combinerStrategy = + QueryAndResponseCombiner.getImplementation(rb.req.getParams(), combiners); + QueryResult combinedQueryResult = combinerStrategy.combine(queryResults, rb.req.getParams()); + combinedQueryResult.setPartialResults(partialResults); + combinedQueryResult.setSegmentTerminatedEarly(segmentTerminatedEarly); + combinedQueryResult.setMaxHitsTerminatedEarly(setMaxHitsTerminatedEarly); + crb.setResult(combinedQueryResult); + if (rb.isDebug()) { + String[] queryKeys = rb.req.getParams().getParams(CombinerParams.COMBINER_QUERY); + List<Query> queries = crb.responseBuilders.stream().map(ResponseBuilder::getQuery).toList(); + NamedList<Explanation> explanations = + combinerStrategy.getExplanations( + queryKeys, + queries, + queryResults, + rb.req.getSearcher(), + rb.req.getSchema(), + rb.req.getParams()); + rb.addDebugInfo("combinerExplanations", explanations); + } + ResultContext ctx = new BasicResultContext(crb); + crb.rsp.addResponse(ctx); + crb.rsp + .getToLog() + .add( + "hits", + crb.getResults() == null || crb.getResults().docList == null + ? 0 + : crb.getResults().docList.matches()); + if (!crb.req.getParams().getBool(ShardParams.IS_SHARD, false)) { + // for non-distributed request + if (null != crb.getNextCursorMark()) { + crb.rsp.add( + CursorMarkParams.CURSOR_MARK_NEXT, + crb.responseBuilders.getFirst().getNextCursorMark().getSerializedTotem()); + } + } + + if (crb.mergeFieldHandler != null) { + crb.mergeFieldHandler.handleMergeFields(crb, crb.req.getSearcher()); + } else { + doFieldSortValues(rb, crb.req.getSearcher()); + } + doPrefetch(crb); + } else { + super.process(rb); + } + } + + @Override + protected void mergeIds(ResponseBuilder rb, ShardRequest sreq) { + List<MergeStrategy> mergeStrategies = rb.getMergeStrategies(); + if (mergeStrategies != null) { + mergeStrategies.sort(MergeStrategy.MERGE_COMP); + boolean idsMerged = false; + for (MergeStrategy mergeStrategy : mergeStrategies) { + mergeStrategy.merge(rb, sreq); + if (mergeStrategy.mergesIds()) { + idsMerged = true; + } + } + + if (idsMerged) { + return; // ids were merged above so return. + } + } + + SortSpec ss = rb.getSortSpec(); + + // If the shard request was also used to get fields (along with the scores), there is no reason + // to copy over the score dependent fields, since those will already exist in the document with + // the return fields + Set<String> scoreDependentFields; + if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) == 0) { + scoreDependentFields = + rb.rsp.getReturnFields().getScoreDependentReturnFields().keySet().stream() + .filter(field -> !field.equals(SolrReturnFields.SCORE)) + .collect(Collectors.toSet()); + } else { + scoreDependentFields = Collections.emptySet(); + } + + IndexSchema schema = rb.req.getSchema(); + SchemaField uniqueKeyField = schema.getUniqueKeyField(); + + // id to shard mapping, to eliminate any accidental dups + HashMap<Object, String> uniqueDoc = new HashMap<>(); + + NamedList<Object> shardInfo = null; + if (rb.req.getParams().getBool(ShardParams.SHARDS_INFO, false)) { + shardInfo = new SimpleOrderedMap<>(); + rb.rsp.getValues().add(ShardParams.SHARDS_INFO, shardInfo); + } + + long numFound = 0; + boolean hitCountIsExact = true; + Float maxScore = null; + boolean thereArePartialResults = false; + Boolean segmentTerminatedEarly = null; + boolean maxHitsTerminatedEarly = false; + long approximateTotalHits = 0; + int failedShardCount = 0; + Map<String, List<ShardDoc>> shardDocMap = new HashMap<>(); + for (ShardResponse srsp : sreq.responses) { + SolrDocumentList docs = null; + NamedList<?> responseHeader = null; + + if (shardInfo != null) { + SimpleOrderedMap<Object> nl = new SimpleOrderedMap<>(); + + if (srsp.getException() != null) { + Throwable t = srsp.getException(); + if (t instanceof SolrServerException && t.getCause() != null) { + t = t.getCause(); + } + nl.add("error", t.toString()); + if (!rb.req.getCore().getCoreContainer().hideStackTrace()) { + StringWriter trace = new StringWriter(); + t.printStackTrace(new PrintWriter(trace)); + nl.add("trace", trace.toString()); + } + if (!StrUtils.isNullOrEmpty(srsp.getShardAddress())) { + nl.add("shardAddress", srsp.getShardAddress()); + } + } else { + responseHeader = + (NamedList<?>) + SolrResponseUtil.getSubsectionFromShardResponse( + rb, srsp, "responseHeader", false); + if (responseHeader == null) { + continue; + } + final Object rhste = + responseHeader.get(SolrQueryResponse.RESPONSE_HEADER_SEGMENT_TERMINATED_EARLY_KEY); + if (rhste != null) { + nl.add(SolrQueryResponse.RESPONSE_HEADER_SEGMENT_TERMINATED_EARLY_KEY, rhste); + } + final Object rhmhte = + responseHeader.get(SolrQueryResponse.RESPONSE_HEADER_MAX_HITS_TERMINATED_EARLY_KEY); + if (rhmhte != null) { + nl.add(SolrQueryResponse.RESPONSE_HEADER_MAX_HITS_TERMINATED_EARLY_KEY, rhmhte); + } + final Object rhath = + responseHeader.get(SolrQueryResponse.RESPONSE_HEADER_APPROXIMATE_TOTAL_HITS_KEY); + if (rhath != null) { + nl.add(SolrQueryResponse.RESPONSE_HEADER_APPROXIMATE_TOTAL_HITS_KEY, rhath); + } + docs = + (SolrDocumentList) + SolrResponseUtil.getSubsectionFromShardResponse(rb, srsp, "response", false); + if (docs == null) { + continue; + } + nl.add("numFound", docs.getNumFound()); + nl.add("numFoundExact", docs.getNumFoundExact()); + nl.add("maxScore", docs.getMaxScore()); + nl.add("shardAddress", srsp.getShardAddress()); + } + if (srsp.getSolrResponse() != null) { + nl.add("time", srsp.getSolrResponse().getElapsedTime()); + } + // This ought to be better, but at least this ensures no duplicate keys in JSON result + String shard = srsp.getShard(); + if (StrUtils.isNullOrEmpty(shard)) { + failedShardCount += 1; + shard = "unknown_shard_" + failedShardCount; + } + shardInfo.add(shard, nl); + } + // now that we've added the shard info, let's only proceed if we have no error. + if (srsp.getException() != null) { + thereArePartialResults = true; + continue; + } + + if (docs == null) { // could have been initialized in the shards info block above + docs = + Objects.requireNonNull( + (SolrDocumentList) + SolrResponseUtil.getSubsectionFromShardResponse(rb, srsp, "response", false)); + } + + if (responseHeader == null) { // could have been initialized in the shards info block above + responseHeader = + Objects.requireNonNull( + (NamedList<?>) + SolrResponseUtil.getSubsectionFromShardResponse( + rb, srsp, "responseHeader", false)); + } + + final boolean thisResponseIsPartial; + thisResponseIsPartial = + Boolean.TRUE.equals( + responseHeader.getBooleanArg(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY)); + thereArePartialResults |= thisResponseIsPartial; + + if (!Boolean.TRUE.equals(segmentTerminatedEarly)) { + final Object ste = + responseHeader.get(SolrQueryResponse.RESPONSE_HEADER_SEGMENT_TERMINATED_EARLY_KEY); + if (Boolean.TRUE.equals(ste)) { + segmentTerminatedEarly = Boolean.TRUE; + } else if (Boolean.FALSE.equals(ste)) { + segmentTerminatedEarly = Boolean.FALSE; + } + } + + if (!maxHitsTerminatedEarly) { + if (Boolean.TRUE.equals( + responseHeader.get(SolrQueryResponse.RESPONSE_HEADER_MAX_HITS_TERMINATED_EARLY_KEY))) { + maxHitsTerminatedEarly = true; + } + } + Object ath = responseHeader.get(SolrQueryResponse.RESPONSE_HEADER_APPROXIMATE_TOTAL_HITS_KEY); + if (ath == null) { + approximateTotalHits += numFound; + } else { + approximateTotalHits += ((Number) ath).longValue(); + } + + // calculate global maxScore and numDocsFound + if (docs.getMaxScore() != null) { + maxScore = maxScore == null ? docs.getMaxScore() : Math.max(maxScore, docs.getMaxScore()); + } + numFound += docs.getNumFound(); + + if (hitCountIsExact && Boolean.FALSE.equals(docs.getNumFoundExact())) { + hitCountIsExact = false; + } + + @SuppressWarnings("unchecked") + NamedList<List<Object>> sortFieldValues = + (NamedList<List<Object>>) + SolrResponseUtil.getSubsectionFromShardResponse(rb, srsp, "sort_values", true); + if (null == sortFieldValues) { + sortFieldValues = new NamedList<>(); + } + + // if the SortSpec contains a field besides score or the Lucene docid, then the values will + // need to be unmarshalled from sortFieldValues. + boolean needsUnmarshalling = ss.includesNonScoreOrDocField(); + + // if we need to unmarshal the sortFieldValues for sorting but we have none, which can happen + // if partial results are being returned from the shard, then skip merging the results for the + // shard. This avoids an exception below. if the shard returned partial results but we don't + // need to unmarshal (a normal scoring query), then merge what we got. + if (thisResponseIsPartial && sortFieldValues.size() == 0 && needsUnmarshalling) { + continue; + } + + // Checking needsUnmarshalling saves on iterating the SortFields in the SortSpec again. + NamedList<List<Object>> unmarshalledSortFieldValues = + needsUnmarshalling ? unmarshalSortValues(ss, sortFieldValues, schema) : new NamedList<>(); + + // go through every doc in this response, construct a ShardDoc, and + // put it in the priority queue so it can be ordered. + for (int i = 0; i < docs.size(); i++) { + SolrDocument doc = docs.get(i); + Object id = doc.getFieldValue(uniqueKeyField.getName()); + ShardDoc shardDoc = new ShardDoc(); + shardDoc.id = id; + shardDoc.shard = srsp.getShard(); + shardDoc.orderInShard = i; + Object scoreObj = doc.getFieldValue(SolrReturnFields.SCORE); + if (scoreObj != null) { + if (scoreObj instanceof String) { + shardDoc.score = Float.parseFloat((String) scoreObj); + } else { + shardDoc.score = ((Number) scoreObj).floatValue(); + } + } + if (!scoreDependentFields.isEmpty()) { + shardDoc.scoreDependentFields = doc.getSubsetOfFields(scoreDependentFields); + } + + shardDoc.sortFieldValues = unmarshalledSortFieldValues; + shardDocMap.computeIfAbsent(srsp.getShard(), list -> new ArrayList<>()).add(shardDoc); + String prevShard = uniqueDoc.put(id, srsp.getShard()); Review Comment: Thanks Christine. The amount of code duplication of mergeIds, a massive method, is extreme! Something *must* be done; it'll be a burden to maintain otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
