This is an automated email from the ASF dual-hosted git repository.
sigram pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_10x by this push:
new ce059dc3872 SOLR-18077: Fix incorrect request reuse, fix unit tests.
(cherry-pick from #4396) (#4400)
ce059dc3872 is described below
commit ce059dc3872a32e78c15ab202913a02edc92fa2e
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Thu May 7 12:52:34 2026 +0200
SOLR-18077: Fix incorrect request reuse, fix unit tests. (cherry-pick from
#4396) (#4400)
---
solr/cross-dc-manager/build.gradle | 1 +
.../manager/consumer/KafkaCrossDcConsumer.java | 30 +++-
.../crossdc/manager/consumer/PartitionManager.java | 14 +-
.../manager/SolrAndKafkaIntegrationTest.java | 161 +++++++++++++++++++--
4 files changed, 188 insertions(+), 18 deletions(-)
diff --git a/solr/cross-dc-manager/build.gradle
b/solr/cross-dc-manager/build.gradle
index 4ce538f67f7..14ed70a5dbc 100644
--- a/solr/cross-dc-manager/build.gradle
+++ b/solr/cross-dc-manager/build.gradle
@@ -35,6 +35,7 @@ dependencies {
implementation libs.opentelemetry.sdk.metrics
implementation libs.eclipse.jetty.server
implementation libs.eclipse.jetty.ee10.servlet
+ implementation libs.google.guava
implementation libs.jakarta.servlet.api
implementation libs.slf4j.api
runtimeOnly libs.google.protobuf.javautils
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index c3b5bdb3a70..d368151c6af 100644
---
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -75,6 +75,8 @@ import org.slf4j.LoggerFactory;
public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final String PROP_TOPIC_DEBUG =
"solr.crossdc.consumer.topic.debug";
+
private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
private final AdminClient adminClient;
private final CountDownLatch startLatch;
@@ -101,6 +103,8 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private volatile boolean running = false;
+ private boolean topicDebug =
Boolean.parseBoolean(System.getProperty(PROP_TOPIC_DEBUG, "false"));
+
/**
* Supplier for creating and managing a working CloudSolrClient instance.
This class ensures that
* the CloudSolrClient instance doesn't try to use its {@link
@@ -175,6 +179,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
conf.get(CrossDcConf.COLLAPSE_UPDATES),
CrossDcConf.CollapseUpdates.PARTIAL);
this.maxCollapseRecords =
conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
this.startLatch = startLatch;
+
final Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.put(
@@ -375,6 +380,9 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord = null;
for (TopicPartition partition : records.partitions()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Checking partition {}", partition.partition());
+ }
List<ConsumerRecord<String, MirroredSolrRequest<?>>> partitionRecords =
records.records(partition);
@@ -396,19 +404,31 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
metrics.incrementInputMsgCounter();
lastRecord = requestRecord;
- MirroredSolrRequest<?> req = requestRecord.value();
- SolrRequest<?> solrReq = req.getSolrRequest();
- MirroredSolrRequest.Type type = req.getType();
+ final MirroredSolrRequest<?> req = requestRecord.value();
+ final SolrRequest<?> solrReq = req.getSolrRequest();
+ final MirroredSolrRequest.Type type = req.getType();
if (type != MirroredSolrRequest.Type.UPDATE) {
String action = solrReq.getParams().get("action", "unknown");
metrics.incrementInputReqCounter(type.name(), action);
}
- ModifiableSolrParams params = new
ModifiableSolrParams(solrReq.getParams());
+ final ModifiableSolrParams params = new
ModifiableSolrParams(solrReq.getParams());
if (log.isTraceEnabled()) {
log.trace("-- picked type={}, params={}", req.getType(), params);
}
+ if (topicDebug) {
+ solrReq.addHeader("topic.debug", "true");
+ solrReq.addHeader("record.topic", requestRecord.topic());
+ solrReq.addHeader("record.partition",
String.valueOf(requestRecord.partition()));
+ solrReq.addHeader("record.offset",
String.valueOf(requestRecord.offset()));
+ solrReq.addHeader("record.timestamp",
String.valueOf(requestRecord.timestamp()));
+ solrReq.addHeader("record.key", requestRecord.key());
+ solrReq.addHeader("workUnit.nextOffset",
String.valueOf(workUnit.nextOffset));
+ solrReq.addHeader("workUnit.partition",
String.valueOf(workUnit.partition));
+ solrReq.addHeader("workUnit.topic", workUnit.topic);
+ solrReq.addHeader("workUnit.items",
String.valueOf(workUnit.workItems.size()));
+ }
// determine if it's an UPDATE with deletes, or if the existing
batch has deletes
boolean hasDeletes = false;
@@ -450,6 +470,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
if (updateReqBatch == null) {
// just initialize
updateReqBatch = new UpdateRequest();
+ updateReqBatch.addHeaders(solrReq.getHeaders());
} else {
if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) {
throw new RuntimeException("Can't collapse requests.");
@@ -490,6 +511,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
if (updateReqBatch != null) {
sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE,
lastRecord, workUnit);
+ updateReqBatch = null;
}
try {
partitionManager.checkForOffsetUpdates(partition);
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java
index c1004528ab6..c93740f25ea 100644
---
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.crossdc.manager.consumer;
+import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.util.ArrayDeque;
import java.util.HashSet;
@@ -44,13 +45,16 @@ public class PartitionManager {
final Queue<WorkUnit> partitionQueue = new ArrayDeque<>();
}
- static class WorkUnit {
- final TopicPartition partition;
- Set<Future<?>> workItems = new HashSet<>();
+ @VisibleForTesting
+ public static class WorkUnit {
+ final int partition;
+ final String topic;
+ final Set<Future<?>> workItems = new HashSet<>();
long nextOffset;
- public WorkUnit(TopicPartition partition) {
- this.partition = partition;
+ WorkUnit(TopicPartition partition) {
+ this.partition = partition.partition();
+ this.topic = partition.topic();
}
}
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index e837be5fec3..73b6bd8abd5 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -16,8 +16,12 @@
*/
package org.apache.solr.crossdc.manager;
+import static org.apache.solr.crossdc.common.CrossDcConf.COLLAPSE_UPDATES;
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.BATCH_SIZE_BYTES;
+import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.BOOTSTRAP_SERVERS;
import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.TOPIC_NAME;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
@@ -28,14 +32,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -45,6 +53,7 @@ import org.apache.lucene.tests.util.QuickPatchThreadsFilter;
import org.apache.solr.SolrIgnoredThreadsFilter;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -56,14 +65,19 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.manager.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
+import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer;
+import org.apache.solr.crossdc.manager.consumer.PartitionManager;
import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter;
import org.junit.After;
import org.junit.Before;
@@ -90,11 +104,64 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
private static final int NUM_BROKERS = 1;
public EmbeddedKafkaCluster kafkaCluster;
+ private static class ConsumerBatch {
+ final String kafkaTopic;
+ final int partitionId;
+ final MirroredSolrRequest.Type type;
+ final String collection;
+ final Map<String, String> headers;
+ final Set<String> addIds = new HashSet<>();
+ final String json;
+
+ public ConsumerBatch(final MirroredSolrRequest.Type type, final
SolrRequest<?> solrRequest) {
+ this.kafkaTopic = solrRequest.getHeaders().get("record.topic");
+ this.partitionId =
Integer.parseInt(solrRequest.getHeaders().get("record.partition"));
+ this.type = type;
+ this.collection = solrRequest.getCollection();
+ this.headers = solrRequest.getHeaders();
+ if (solrRequest instanceof UpdateRequest) {
+ UpdateRequest updateReq = (UpdateRequest) solrRequest;
+ json =
+ Utils.toJSONString(
+ Map.of("params", updateReq.getParams(), "add",
updateReq.getDocuments()));
+ updateReq.getDocuments().forEach(doc ->
addIds.add(doc.getFieldValue("id").toString()));
+ } else {
+ json =
+ Utils.toJSONString(
+ Map.of("params", solrRequest.getParams(), "class",
solrRequest.getClass()));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerBatch{"
+ + "kafkaTopic='"
+ + kafkaTopic
+ + '\''
+ + ", partitionId="
+ + partitionId
+ + ", type="
+ + type
+ + ", collection='"
+ + collection
+ + '\''
+ + ", headers="
+ + headers
+ + '\''
+ + ", json='"
+ + json
+ + '\''
+ + '}';
+ }
+ }
+
protected volatile MiniSolrCloudCluster solrCluster1;
protected volatile MiniSolrCloudCluster solrCluster2;
protected volatile Consumer consumer;
+ private List<ConsumerBatch> consumerBatches;
+
private static final String TOPIC = "topic1";
private static final String COLLECTION = "collection1";
@@ -112,7 +179,28 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
Thread.setDefaultUncaughtExceptionHandler(
(t, e) -> log.error("Uncaught exception in thread {}", t, e));
System.setProperty("otel.metrics.exporter", "prometheus");
- consumer = new Consumer();
+ System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true");
+ consumerBatches = new ArrayList<>();
+ consumer =
+ new Consumer() {
+ @Override
+ protected CrossDcConsumer getCrossDcConsumer(
+ final KafkaCrossDcConf conf,
+ final ConsumerMetrics metrics,
+ final CountDownLatch startLatch) {
+ return new KafkaCrossDcConsumer(conf, metrics, startLatch) {
+ @Override
+ public void sendBatch(
+ final SolrRequest<? extends SolrResponse> solrReqBatch,
+ final MirroredSolrRequest.Type type,
+ final ConsumerRecord<String, MirroredSolrRequest<?>>
lastRecord,
+ final PartitionManager.WorkUnit workUnit) {
+ consumerBatches.add(new ConsumerBatch(type, solrReqBatch));
+ super.sendBatch(solrReqBatch, type, lastRecord, workUnit);
+ }
+ };
+ }
+ };
Properties config = new Properties();
kafkaCluster =
@@ -124,13 +212,15 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
};
kafkaCluster.start();
- kafkaCluster.createTopic(TOPIC, 10, 1);
+ // create many partitions to test for re-ordered reads
+ kafkaCluster.createTopic(TOPIC, 3, 1);
// ensure small batches to test multi-partition ordering
- System.setProperty("batchSizeBytes", "128");
- System.setProperty("solr.crossdc.topicName", TOPIC);
- System.setProperty("solr.crossdc.bootstrapServers",
kafkaCluster.bootstrapServers());
+ System.setProperty(BATCH_SIZE_BYTES, "100");
+ System.setProperty(TOPIC_NAME, TOPIC);
+ System.setProperty(BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
+ System.setProperty(COLLAPSE_UPDATES, "none");
solrCluster1 =
configureCluster(1).addConfig("conf",
getFile("configs/cloud-minimal/conf")).configure();
@@ -238,10 +328,62 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod
tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam,
quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo
consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse
cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non
proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
@Test
- @Ignore("SOLR-18077")
+ public void testPartitioning() throws Exception {
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1);
+ create.process(solrCluster1.getSolrClient());
+ create.process(solrCluster2.getSolrClient());
+ solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+ solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+ CloudSolrClient client = solrCluster1.getSolrClient();
+ int NUM_DOCS = 200;
+ for (int i = 0; i < NUM_DOCS; i++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "id-" + i);
+ doc.addField("id_i", i);
+ doc.addField("text", "some test with a relatively long field. " +
LOREM_IPSUM);
+ doc.addField("collection_t", COLLECTION);
+
+ client.add(COLLECTION, doc);
+
+ doc = new SolrInputDocument();
+ doc.addField("id", "id-" + i);
+ doc.addField("id_i", i);
+ doc.addField("text", "some test with a relatively long field. " +
LOREM_IPSUM);
+ doc.addField("collection_t", ALT_COLLECTION);
+
+ client.add(ALT_COLLECTION, doc);
+ }
+ client.commit(COLLECTION);
+ client.commit(ALT_COLLECTION);
+ // check that updates to different collections were always sent to the
same partition
+ Map<Integer, String> partitionsPerCol = new HashMap<>();
+ Map<String, Set<String>> docsPerCol = new HashMap<>();
+ for (ConsumerBatch batch : consumerBatches) {
+ String collection =
+ partitionsPerCol.computeIfAbsent(batch.partitionId, k ->
batch.collection);
+ docsPerCol.computeIfAbsent(collection, col -> new
HashSet<>()).addAll(batch.addIds);
+ assertEquals(
+ "request in partition "
+ + batch.partitionId
+ + " has wrong collection "
+ + batch.collection
+ + ": "
+ + batch
+ + "\npartitions: "
+ + partitionsPerCol,
+ collection,
+ batch.collection);
+ }
+ docsPerCol.forEach(
+ (col, ids) -> assertEquals("incorrect count in collection " + col,
NUM_DOCS, ids.size()));
+ }
+
+ @Test
public void testStrictOrdering() throws Exception {
CloudSolrClient client = solrCluster1.getSolrClient();
- int NUM_DOCS = 5000;
+ int NUM_DOCS = 1000;
// delay deletes by this many docs
int DELTA = 100;
for (int i = 0; i < NUM_DOCS; i++) {
@@ -454,11 +596,12 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
boolean foundUpdates = false;
for (int i = 0; i < 100; i++) {
client.commit(collection);
- results = client.query(collection, new SolrQuery(query));
+ results =
+ client.query(collection, new SolrQuery(CommonParams.Q, query,
CommonParams.FL, "*"));
if (results.getResults().getNumFound() == expectedNumDocs) {
foundUpdates = true;
} else {
- Thread.sleep(200);
+ Thread.sleep(300);
}
}