This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 6f41a7884ba SOLR-17973: Fix `shards.preference` not respected for
cross-collection join queries (#4218)
6f41a7884ba is described below
commit 6f41a7884bac47be908635798351e636f603b7d9
Author: Khush Jain <[email protected]>
AuthorDate: Tue Mar 17 20:57:16 2026 -0400
SOLR-17973: Fix `shards.preference` not respected for cross-collection join
queries (#4218)
---
...fix-shards-preference-cross-collection-join.yml | 7 +
.../search/join/CrossCollectionJoinQParser.java | 7 +
.../solr/search/join/CrossCollectionJoinQuery.java | 22 +-
.../search/join/CrossCollectionJoinQueryTest.java | 298 +++++++++++++++++++++
4 files changed, 333 insertions(+), 1 deletion(-)
diff --git
a/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml
b/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml
new file mode 100644
index 00000000000..1620764ea06
--- /dev/null
+++
b/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml
@@ -0,0 +1,7 @@
+title: "SOLR-17973: Fix `shards.preference` not respected for cross-collection
join queries"
+type: fixed
+authors:
+ - name: khushjain
+links:
+ - name: SOLR-17973
+ url: https://issues.apache.org/jira/browse/SOLR-17973
diff --git
a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java
b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java
index 1b78a0cb2e0..2e4675a880d 100644
---
a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java
+++
b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.Set;
import org.apache.lucene.search.Query;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.QParser;
@@ -102,6 +103,12 @@ public class CrossCollectionJoinQParser extends QParser {
}
}
+ // Propagate shards.preference from request-level params if not already
set in localParams
+ String shardsPreference =
req.getParams().get(ShardParams.SHARDS_PREFERENCE);
+ if (shardsPreference != null &&
otherParams.get(ShardParams.SHARDS_PREFERENCE) == null) {
+ otherParams.set(ShardParams.SHARDS_PREFERENCE, shardsPreference);
+ }
+
return new CrossCollectionJoinQuery(
query, zkHost, solrUrl, collection, fromField, toField,
routedByJoinKey, ttl, otherParams);
}
diff --git
a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java
b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java
index bb03022afdf..8381ed17dbb 100644
---
a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java
+++
b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java
@@ -48,11 +48,14 @@ import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import
org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import
org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -219,11 +222,13 @@ public class CrossCollectionJoinQuery extends Query
implements SolrSearcherRequi
}
private TupleStream createCloudSolrStream(SolrClientCache solrClientCache)
throws IOException {
+ ZkController zkController =
searcher.getCore().getCoreContainer().getZkController();
+
String streamZkHost;
if (zkHost != null) {
streamZkHost = zkHost;
} else {
- streamZkHost =
searcher.getCore().getCoreContainer().getZkController().getZkServerAddress();
+ streamZkHost = zkController.getZkServerAddress();
}
ModifiableSolrParams params = new ModifiableSolrParams(otherParams);
@@ -239,6 +244,21 @@ public class CrossCollectionJoinQuery extends Query
implements SolrSearcherRequi
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);
+ streamContext.setRequestParams(new ModifiableSolrParams(otherParams));
+ if (zkController != null) {
+ RequestReplicaListTransformerGenerator rltg =
+ new RequestReplicaListTransformerGenerator(
+ zkController
+ .getZkStateReader()
+ .getClusterProperties()
+ .getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
+ .toString(),
+ zkController.getNodeName(),
+ zkController.getBaseUrl(),
+ zkController.getHostName(),
+ zkController.getSysPropsCacher());
+ streamContext.setRequestReplicaListTransformerGenerator(rltg);
+ }
TupleStream cloudSolrStream = new CloudSolrStream(streamZkHost,
collection, params);
TupleStream uniqueStream = new UniqueStream(cloudSolrStream, new
FieldEqualitor(fromField));
diff --git
a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
index 91cbc3bbd8b..70d77363539 100644
---
a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
+++
b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
+import org.apache.lucene.search.Query;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -30,8 +31,16 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
import org.apache.solr.embedded.JettySolrRunner;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequestBase;
+import org.apache.solr.search.QueryParsing;
+import org.apache.solr.util.SolrJMetricTestUtils;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -305,6 +314,295 @@ public class CrossCollectionJoinQueryTest extends
SolrCloudTestCase {
}
}
+ @Test
+ public void testShardsPreferenceRequestParamPropagation() throws Exception {
+ // shards.preference set as a request-level param (not in localParams)
should be
+ // propagated to the query's otherParams
+ ModifiableSolrParams requestParams = new ModifiableSolrParams();
+ requestParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false");
+
+ ModifiableSolrParams localParams = new ModifiableSolrParams();
+ localParams.set(QueryParsing.V, "*:*");
+ localParams.set(CrossCollectionJoinQParser.FROM_INDEX, "products");
+ localParams.set(CrossCollectionJoinQParser.FROM, "product_id_s");
+ localParams.set(CrossCollectionJoinQParser.TO, "product_id_s");
+ localParams.set(CrossCollectionJoinQParser.ROUTED_BY_JOIN_KEY, "false");
+
+ try (SolrQueryRequest req = new SolrQueryRequestBase(null, requestParams)
{}) {
+ CrossCollectionJoinQParser parser =
+ new CrossCollectionJoinQParser(
+ null, localParams, requestParams, req, "product_id_s", null);
+ Query query = parser.parse();
+
+ CrossCollectionJoinQuery ccjQuery = (CrossCollectionJoinQuery) query;
+ assertEquals("replica.leader:false",
ccjQuery.otherParams.get(ShardParams.SHARDS_PREFERENCE));
+ }
+ }
+
+ @Test
+ public void testShardsPreferenceLocalParamTakesPrecedence() throws Exception
{
+ // When shards.preference is set in both localParams and request params,
+ // the localParams value should take precedence
+ ModifiableSolrParams requestParams = new ModifiableSolrParams();
+ requestParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false");
+
+ ModifiableSolrParams localParams = new ModifiableSolrParams();
+ localParams.set(QueryParsing.V, "*:*");
+ localParams.set(CrossCollectionJoinQParser.FROM_INDEX, "products");
+ localParams.set(CrossCollectionJoinQParser.FROM, "product_id_s");
+ localParams.set(CrossCollectionJoinQParser.TO, "product_id_s");
+ localParams.set(CrossCollectionJoinQParser.ROUTED_BY_JOIN_KEY, "false");
+ localParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:true");
+
+ try (SolrQueryRequest req = new SolrQueryRequestBase(null, requestParams)
{}) {
+ CrossCollectionJoinQParser parser =
+ new CrossCollectionJoinQParser(
+ null, localParams, requestParams, req, "product_id_s", null);
+ Query query = parser.parse();
+
+ CrossCollectionJoinQuery ccjQuery = (CrossCollectionJoinQuery) query;
+ // localParams value should take precedence over request-level param
+ assertEquals("replica.leader:true",
ccjQuery.otherParams.get(ShardParams.SHARDS_PREFERENCE));
+ }
+ }
+
+ @Test
+ public void testShardsPreferenceWithCrossCollectionJoin() throws Exception {
+ // Use 1 shard with 2 replicas for the "from" collection so there is
exactly
+ // 1 leader and 1 non-leader, each on a different node. This lets us verify
+ // via per-node /export metrics which replica actually served the stream
request.
+ final String fromCollection = "products_pref_test";
+ final String toCollection = "parts_pref_test";
+ try {
+ CollectionAdminRequest.createCollection(fromCollection, "ccjoin", 1, 2)
+ .process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(toCollection, "ccjoin", 1, 1)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(fromCollection, 1, 2);
+ cluster.waitForActiveCollection(toCollection, 1, 1);
+
+ // Index test data
+ List<SolrInputDocument> productDocs = new ArrayList<>();
+ List<SolrInputDocument> partDocs = new ArrayList<>();
+ for (int productId = 0; productId < NUM_PRODUCTS; ++productId) {
+ int sizeNum = productId % SIZES.length;
+ String size = SIZES[sizeNum];
+ productDocs.add(
+ new SolrInputDocument(
+ "id", String.valueOf(productId),
+ "product_id_s", String.valueOf(productId),
+ "size_s", size));
+ for (int partNum = 0; partNum <= sizeNum; partNum++) {
+ String partId = String.format(Locale.ROOT, "%d_%d", productId,
partNum);
+ partDocs.add(
+ new SolrInputDocument("id", partId, "product_id_s",
String.valueOf(productId)));
+ }
+ }
+ indexDocs(fromCollection, productDocs);
+ cluster.getSolrClient().commit(fromCollection);
+ indexDocs(toCollection, partDocs);
+ cluster.getSolrClient().commit(toCollection);
+
+ // Identify leader and non-leader replicas for the "from" collection's
single shard
+ DocCollection fromDocCollection =
+
cluster.getSolrClient().getClusterState().getCollection(fromCollection);
+ Slice shard = fromDocCollection.getSlices().iterator().next();
+ Replica leader = shard.getLeader();
+ assertNotNull("Leader should exist for shard", leader);
+ Replica nonLeader =
+ shard.getReplicas().stream()
+ .filter(r -> !r.getName().equals(leader.getName()))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Expected a non-leader
replica"));
+
+ String leaderBaseUrl = leader.getBaseUrl();
+ String nonLeaderBaseUrl = nonLeader.getBaseUrl();
+ assertNotEquals(
+ "Leader and non-leader should be on different nodes for this test to
be meaningful",
+ leaderBaseUrl,
+ nonLeaderBaseUrl);
+
+ // --- Test 1: replica.leader:false should route /export to the
non-leader ---
+ double leaderCountBefore = getNumExportRequests(leaderBaseUrl,
fromCollection);
+ double nonLeaderCountBefore = getNumExportRequests(nonLeaderBaseUrl,
fromCollection);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(
+ "q",
+ String.format(
+ Locale.ROOT,
+ "{!join method=crossCollection fromIndex=%s from=product_id_s
to=product_id_s routed=false ttl=0}size_s:M",
+ fromCollection));
+ params.set("rows", "0");
+ params.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false");
+
+ QueryResponse resp = cluster.getSolrClient().query(toCollection, params);
+ assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound());
+
+ double leaderCountAfter = getNumExportRequests(leaderBaseUrl,
fromCollection);
+ double nonLeaderCountAfter = getNumExportRequests(nonLeaderBaseUrl,
fromCollection);
+
+ assertTrue(
+ "Non-leader replica should have received the /export request"
+ + " (before="
+ + nonLeaderCountBefore
+ + ", after="
+ + nonLeaderCountAfter
+ + ")",
+ nonLeaderCountAfter > nonLeaderCountBefore);
+ assertEquals(
+ "Leader replica should NOT have received the /export request",
+ leaderCountBefore,
+ leaderCountAfter,
+ 0.0);
+
+ // --- Test 2: replica.leader:true should route /export to the leader ---
+ leaderCountBefore = leaderCountAfter;
+ nonLeaderCountBefore = nonLeaderCountAfter;
+
+ params.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:true");
+ resp = cluster.getSolrClient().query(toCollection, params);
+ assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound());
+
+ leaderCountAfter = getNumExportRequests(leaderBaseUrl, fromCollection);
+ nonLeaderCountAfter = getNumExportRequests(nonLeaderBaseUrl,
fromCollection);
+
+ assertTrue(
+ "Leader replica should have received the /export request"
+ + " (before="
+ + leaderCountBefore
+ + ", after="
+ + leaderCountAfter
+ + ")",
+ leaderCountAfter > leaderCountBefore);
+ assertEquals(
+ "Non-leader replica should NOT have received the /export request",
+ nonLeaderCountBefore,
+ nonLeaderCountAfter,
+ 0.0);
+ } finally {
+
CollectionAdminRequest.deleteCollection(toCollection).process(cluster.getSolrClient());
+
CollectionAdminRequest.deleteCollection(fromCollection).process(cluster.getSolrClient());
+ }
+ }
+
+ @Test
+ public void testShardsPreferenceLocationLocal() throws Exception {
+ // Test that replica.location:local routes the join's /export stream to a
replica
+ // on the same node where the join query is processed. This validates that
the
+ // RequestReplicaListTransformerGenerator is initialized with full node
context
+ // (nodeName, baseUrl, hostName).
+ final String fromCollection = "products_local_test";
+ final String toCollection = "parts_local_test";
+ try {
+ // "from" collection: 1 shard, NUM_NODES replicas → one replica on every
node
+ CollectionAdminRequest.createCollection(fromCollection, "ccjoin", 1,
NUM_NODES)
+ .process(cluster.getSolrClient());
+ // "to" collection: 1 shard, 1 replica → on exactly one node
+ CollectionAdminRequest.createCollection(toCollection, "ccjoin", 1, 1)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(fromCollection, 1, NUM_NODES);
+ cluster.waitForActiveCollection(toCollection, 1, 1);
+
+ // Index test data
+ List<SolrInputDocument> productDocs = new ArrayList<>();
+ List<SolrInputDocument> partDocs = new ArrayList<>();
+ for (int productId = 0; productId < NUM_PRODUCTS; ++productId) {
+ int sizeNum = productId % SIZES.length;
+ String size = SIZES[sizeNum];
+ productDocs.add(
+ new SolrInputDocument(
+ "id", String.valueOf(productId),
+ "product_id_s", String.valueOf(productId),
+ "size_s", size));
+ for (int partNum = 0; partNum <= sizeNum; partNum++) {
+ String partId = String.format(Locale.ROOT, "%d_%d", productId,
partNum);
+ partDocs.add(
+ new SolrInputDocument("id", partId, "product_id_s",
String.valueOf(productId)));
+ }
+ }
+ indexDocs(fromCollection, productDocs);
+ cluster.getSolrClient().commit(fromCollection);
+ indexDocs(toCollection, partDocs);
+ cluster.getSolrClient().commit(toCollection);
+
+ // Find the node hosting the "to" collection's shard. The join stream
will execute
+ // on this node, so replica.location:local should prefer the "from"
replica here.
+ DocCollection toDocCollection =
+
cluster.getSolrClient().getClusterState().getCollection(toCollection);
+ Slice toShard = toDocCollection.getSlices().iterator().next();
+ String toNodeBaseUrl =
toShard.getReplicas().iterator().next().getBaseUrl();
+
+ // Collect all "from" replica base URLs
+ DocCollection fromDocCollection =
+
cluster.getSolrClient().getClusterState().getCollection(fromCollection);
+ Slice fromShard = fromDocCollection.getSlices().iterator().next();
+ List<String> fromBaseUrls = new ArrayList<>();
+ for (Replica r : fromShard.getReplicas()) {
+ fromBaseUrls.add(r.getBaseUrl());
+ }
+ assertTrue(
+ "The 'from' collection should have a replica on the same node as the
'to' collection",
+ fromBaseUrls.contains(toNodeBaseUrl));
+
+ // Get baseline /export request counts for the "from" collection on all
nodes
+ double localCountBefore = getNumExportRequests(toNodeBaseUrl,
fromCollection);
+ List<double[]> remoteCountsBefore = new ArrayList<>();
+ for (String baseUrl : fromBaseUrls) {
+ if (!baseUrl.equals(toNodeBaseUrl)) {
+ remoteCountsBefore.add(new double[] {getNumExportRequests(baseUrl,
fromCollection)});
+ }
+ }
+
+ // Execute join query with replica.location:local
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(
+ "q",
+ String.format(
+ Locale.ROOT,
+ "{!join method=crossCollection fromIndex=%s from=product_id_s
to=product_id_s routed=false ttl=0}size_s:M",
+ fromCollection));
+ params.set("rows", "0");
+ params.set(ShardParams.SHARDS_PREFERENCE, "replica.location:local");
+
+ QueryResponse resp = cluster.getSolrClient().query(toCollection, params);
+ assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound());
+
+ // Verify the local node's "from" replica received the /export request
+ double localCountAfter = getNumExportRequests(toNodeBaseUrl,
fromCollection);
+ assertTrue(
+ "Local 'from' replica should have received the /export request"
+ + " (before="
+ + localCountBefore
+ + ", after="
+ + localCountAfter
+ + ")",
+ localCountAfter > localCountBefore);
+
+ // Verify remote nodes did NOT receive /export requests
+ int remoteIdx = 0;
+ for (String baseUrl : fromBaseUrls) {
+ if (!baseUrl.equals(toNodeBaseUrl)) {
+ double remoteCountAfter = getNumExportRequests(baseUrl,
fromCollection);
+ assertEquals(
+ "Remote 'from' replica on " + baseUrl + " should NOT have
received /export request",
+ remoteCountsBefore.get(remoteIdx)[0],
+ remoteCountAfter,
+ 0.0);
+ remoteIdx++;
+ }
+ }
+ } finally {
+
CollectionAdminRequest.deleteCollection(toCollection).process(cluster.getSolrClient());
+
CollectionAdminRequest.deleteCollection(fromCollection).process(cluster.getSolrClient());
+ }
+ }
+
+ private static double getNumExportRequests(String baseUrl, String
collectionName)
+ throws SolrServerException, IOException {
+ return SolrJMetricTestUtils.getNumCoreRequests(baseUrl, collectionName,
"QUERY", "/export");
+ }
+
public void testCcJoinQuery(String query, boolean expectFullResults) throws
Exception {
assertResultCount("parts", query, NUM_PRODUCTS / 2, expectFullResults);
}