This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_10x by this push:
     new 7830acb8539 SOLR-18077: CrossDC Consumer - out-of-order Kafka 
partition processing (cherry-pick from #4125) (#4152)
7830acb8539 is described below

commit 7830acb8539424b4f5d6c5e9fff83beea21ada70
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Fri Feb 20 17:29:04 2026 +0100

    SOLR-18077: CrossDC Consumer - out-of-order Kafka partition processing 
(cherry-pick from #4125) (#4152)
---
 changelog/unreleased/solr-18077.yml                |  9 +++++
 .../manager/SolrAndKafkaIntegrationTest.java       | 41 +++++++++++++++++++++-
 .../solr/crossdc/common/KafkaMirroringSink.java    |  5 ++-
 .../pages/cross-dc-replication.adoc                |  8 +++--
 4 files changed, 59 insertions(+), 4 deletions(-)

diff --git a/changelog/unreleased/solr-18077.yml 
b/changelog/unreleased/solr-18077.yml
new file mode 100644
index 00000000000..a4747905be7
--- /dev/null
+++ b/changelog/unreleased/solr-18077.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: "CrossDC Consumer: fix potential out-of-order Kafka partition 
processing"
+type: fixed # added, changed, fixed, deprecated, removed, dependency_update, 
security, other
+authors:
+  - name: Andrzej Bialecki
+    nick: ab
+links:
+  - name: SOLR-18077
+    url: https://issues.apache.org/jira/browse/SOLR-18077
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index 39970d7b4ca..e30ff999a0b 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -33,6 +34,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -122,8 +124,10 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
         };
     kafkaCluster.start();
 
-    kafkaCluster.createTopic(TOPIC, 1, 1);
+    kafkaCluster.createTopic(TOPIC, 10, 1);
 
+    // ensure small batches to test multi-partition ordering
+    System.setProperty("batchSizeBytes", "128");
     System.setProperty("solr.crossdc.topicName", TOPIC);
     System.setProperty("solr.crossdc.bootstrapServers", 
kafkaCluster.bootstrapServers());
     System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
@@ -183,6 +187,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     Thread.setDefaultUncaughtExceptionHandler(uceh);
   }
 
+  @Test
   public void testFullCloudToCloud() throws Exception {
     CloudSolrClient client = solrCluster1.getSolrClient(COLLECTION);
     SolrInputDocument doc = new SolrInputDocument();
@@ -198,6 +203,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
   }
 
+  @Test
   public void testProducerToCloud() throws Exception {
     Properties properties = new Properties();
     properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
@@ -228,6 +234,39 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     producer.close();
   }
 
+  private static final String LOREM_IPSUM =
+      "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod 
tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, 
quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo 
consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse 
cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non 
proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
+
+  @Test
+  public void testStrictOrdering() throws Exception {
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    int NUM_DOCS = 5000;
+    // delay deletes by this many docs
+    int DELTA = 100;
+    for (int i = 0; i < NUM_DOCS; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "id-" + i);
+      doc.addField("text", "some test with a relatively long field. " + 
LOREM_IPSUM);
+
+      client.add(COLLECTION, doc);
+      if (i >= DELTA) {
+        client.deleteById(COLLECTION, "id-" + (i - DELTA));
+      }
+    }
+
+    // send the remaining deletes in random order
+    ArrayList<Integer> ids = new ArrayList<>(DELTA);
+    IntStream.range(0, DELTA).forEach(i -> ids.add(i));
+    Collections.shuffle(ids, random());
+    for (Integer id : ids) {
+      client.deleteById(COLLECTION, "id-" + (NUM_DOCS - DELTA + id));
+    }
+
+    client.commit(COLLECTION);
+
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 0);
+  }
+
   @Test
   @Ignore("This relies on collection properties and I don't see where they are 
read anymore")
   public void testMirroringUpdateProcessor() throws Exception {
diff --git 
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
 
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 1a8ad622c0b..70f48457814 100644
--- 
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ 
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -92,12 +92,15 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
     }
 
     final long enqueueStartNanos = System.nanoTime();
+    // required for multi-partition topics to preserve ordering of requests 
for a collection
+    final String recordKey =
+        request.getSolrRequest() != null ? 
request.getSolrRequest().getCollection() : null;
 
     // Create Producer record
     try {
 
       producer.send(
-          new ProducerRecord<>(topicName, request),
+          new ProducerRecord<>(topicName, recordKey, request),
           (metadata, exception) -> {
             if (exception != null) {
               log.error(
diff --git 
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc 
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
index bb235dcdc7c..07e1b52ecc1 100644
--- 
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
+++ 
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
@@ -130,7 +130,7 @@ Optional configuration properties:
 `solr.crossdc.retryBackoffMs` _<integer>_:: The amount of time to wait before 
attempting to retry a failed request to a given topic partition.
 `solr.crossdc.deliveryTimeoutMS` _<integer>_:: Updates sent to the Kafka queue 
will be failed before the number of retries has been exhausted if the timeout 
configured by delivery.timeout.ms expires first
 `solr.crossdc.maxRequestSizeBytes` _<integer>_:: The maximum size of a Kafka 
queue request in bytes - limits the number of requests that will be sent over 
the queue in a single batch.
-`solr.crossdc.dlqTopicName` _<string>_: If not empty then requests that failed 
processing `maxAttempts` times will be sent to a "dead letter queue" topic in 
Kafka (must exist if configured).
+`solr.crossdc.dlqTopicName` _<string>_:: If not empty then requests that 
failed processing `maxAttempts` times will be sent to a "dead letter queue" 
topic in Kafka (must exist if configured).
 `solr.crossdc.mirrorCommits` _<boolean>_:: If `true` then standalone commit 
requests will be mirrored, otherwise they will be processed only locally.
 `solr.crossdc.expandDbq` _<enum>_ :: If set to `expand` (default) then 
Delete-By-Query will be expanded before mirroring into series of Delete-By-Id, 
which may help with correct processing of out-of-order requests on the consumer 
side.
 If set to `none` then Delete-By-Query requests will be mirrored as-is.
@@ -212,4 +212,8 @@ Setting the `solr.crossdc.enabled` system property or 
xref:collection-management
 - When `solr.crossdc.expandDbq` property is set to `expand` (default) then 
Delete-By-Query converts to a series of Delete-By-Id, which can be much less 
efficient for queries matching large numbers of documents.
 Setting this property to `none` results in forwarding a real Delete-By-Query - 
this reduces the amount of data to mirror but may cause different results due 
to the potential re-ordering of failed & re-submitted requests between Consumer 
and the target Solr.
 - When `solr.crossdc.collapseUpdates` is set to `all` then multiple requests 
containing a mix of add and delete ops will be collapsed into a single outgoing 
request.
-This will cause the original ordering of add / delete ops to be lost (because 
Solr processing of an update request always processes all add ops first, and 
only then the delete ops), which may affect the final outcome when some of the 
ops refer to the same document ids.
\ No newline at end of file
+This will cause the original ordering of add / delete ops to be lost (because 
Solr processing of an update request always processes all add ops first, and 
only then the delete ops), which may affect the final outcome when some of the 
ops refer to the same document ids.
+- When the Kafka topic used for mirroring has multiple partitions the CrossDC 
Producer and Consumer guarantee strict ordering of updates ONLY within the same 
collection.
+In other words, when a multi-partition topic is used for mirroring there's no 
guarantee of a strict global request ordering across
+collections, which normally should not be an issue. However, if a strict 
global ordering across collections is required then
+the mirroring topic must use a single partition.

Reply via email to