This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 35fc2c2 Add the requet back to the queue on fail, not just the last
record. (#75)
35fc2c2 is described below
commit 35fc2c2241d8e3836622fecb08d5729cfdf89970
Author: Mark Robert Miller <[email protected]>
AuthorDate: Tue Oct 3 17:11:56 2023 -0500
Add the requet back to the queue on fail, not just the last record. (#75)
---
.../crossdc/consumer/KafkaCrossDcConsumer.java | 2 +-
.../crossdc/consumer/KafkaCrossDcConsumerTest.java | 5 +-
crossdc-producer/build.gradle | 1 -
.../solr/crossdc/RetryQueueIntegrationTest.java | 60 ++++++++++++++++++++--
4 files changed, 59 insertions(+), 9 deletions(-)
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 1327a42..a221edc 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -329,7 +329,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.trace("result=failed-resubmit");
}
metrics.counter("failed-resubmit").inc();
- kafkaMirroringSink.submit(record.value());
+ kafkaMirroringSink.submit(result.newItem());
break;
case HANDLED:
// no-op
diff --git
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index 4d503c0..709472c 100644
---
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -168,7 +168,8 @@ public class KafkaCrossDcConsumerTest {
// Set up the SolrMessageProcessor mock
SolrMessageProcessor mockMessageProcessor =
mock(SolrMessageProcessor.class);
- IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new
IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null);
+ MirroredSolrRequest request = new MirroredSolrRequest(new
UpdateRequest());
+ IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new
IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null,
request);
when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult);
// Mock the KafkaMirroringSink
@@ -181,7 +182,7 @@ public class KafkaCrossDcConsumerTest {
consumer.processResult(record, failedResubmitResult);
// Verify that the KafkaMirroringSink.submit() method was called
- verify(consumer.kafkaMirroringSink, times(1)).submit(record.value());
+ verify(consumer.kafkaMirroringSink, times(1)).submit(request);
}
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index 3335a01..5d91ced 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -39,7 +39,6 @@ dependencies {
provided group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
- testImplementation 'org.slf4j:slf4j-api:2.0.5'
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
testImplementation('org.mockito:mockito-inline:5.1.1')
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index 6d2d543..3877dfc 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -180,7 +180,57 @@ import java.util.Properties;
}
@Test
- public void testRetryQueue() throws Exception {
+ public void testRetryQueueSolrDown() throws Exception {
+ solrCluster2.getJettySolrRunner(0).stop();
+
+ CloudSolrClient client = solrCluster1.getSolrClient();
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(System.nanoTime()));
+ doc.addField("text", "some test");
+
+ client.add(doc);
+
+ SolrInputDocument doc2 = new SolrInputDocument();
+ doc2.addField("id", String.valueOf(System.nanoTime()));
+ doc2.addField("text", "some test");
+
+ client.add(doc2);
+
+ SolrInputDocument doc3 = new SolrInputDocument();
+ doc3.addField("id", String.valueOf(System.nanoTime()));
+ doc3.addField("text", "some test");
+
+ client.add(doc3);
+
+ client.commit(COLLECTION);
+
+ System.out.println("Sent producer record");
+
+ Thread.sleep(15000);
+
+ solrCluster2.getJettySolrRunner(0).start();
+ Thread.sleep(10000);
+
+ QueryResponse results = null;
+ boolean foundUpdates = false;
+ for (int i = 0; i < 200; i++) {
+ solrCluster2.getSolrClient().commit(COLLECTION);
+ solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ results = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("*:*"));
+ if (results.getResults().getNumFound() == 3) {
+ foundUpdates = true;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+
+ assertTrue("results=" + results, foundUpdates);
+ System.out.println("Rest: " + results);
+
+ }
+
+ @Test
+ public void testRetryQueueZKDown() throws Exception {
Path zkDir = zkTestServer2.getZkDir();
int zkPort = zkTestServer2.getPort();
zkTestServer2.shutdown();
@@ -208,14 +258,16 @@ import java.util.Properties;
System.out.println("Sent producer record");
- Thread.sleep(5000);
+ Thread.sleep(15000);
zkTestServer2 = new ZkTestServer(zkDir, zkPort);
zkTestServer2.run(false);
+ Thread.sleep(10000);
+
QueryResponse results = null;
boolean foundUpdates = false;
- for (int i = 0; i < 200; i++) {
+ for (int i = 0; i < 15; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("*:*"));
@@ -226,8 +278,6 @@ import java.util.Properties;
}
}
- System.out.println("Closed producer");
-
assertTrue("results=" + results, foundUpdates);
System.out.println("Rest: " + results);