This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 35fc2c2  Add the requet back to the queue on fail, not just the last 
record. (#75)
35fc2c2 is described below

commit 35fc2c2241d8e3836622fecb08d5729cfdf89970
Author: Mark Robert Miller <[email protected]>
AuthorDate: Tue Oct 3 17:11:56 2023 -0500

    Add the requet back to the queue on fail, not just the last record. (#75)
---
 .../crossdc/consumer/KafkaCrossDcConsumer.java     |  2 +-
 .../crossdc/consumer/KafkaCrossDcConsumerTest.java |  5 +-
 crossdc-producer/build.gradle                      |  1 -
 .../solr/crossdc/RetryQueueIntegrationTest.java    | 60 ++++++++++++++++++++--
 4 files changed, 59 insertions(+), 9 deletions(-)

diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 1327a42..a221edc 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -329,7 +329,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
           log.trace("result=failed-resubmit");
         }
         metrics.counter("failed-resubmit").inc();
-        kafkaMirroringSink.submit(record.value());
+        kafkaMirroringSink.submit(result.newItem());
         break;
       case HANDLED:
         // no-op
diff --git 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index 4d503c0..709472c 100644
--- 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -168,7 +168,8 @@ public class KafkaCrossDcConsumerTest {
 
         // Set up the SolrMessageProcessor mock
         SolrMessageProcessor mockMessageProcessor = 
mock(SolrMessageProcessor.class);
-        IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new 
IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null);
+        MirroredSolrRequest request = new MirroredSolrRequest(new 
UpdateRequest());
+        IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new 
IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null, 
request);
         
when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult);
 
         // Mock the KafkaMirroringSink
@@ -181,7 +182,7 @@ public class KafkaCrossDcConsumerTest {
         consumer.processResult(record, failedResubmitResult);
 
         // Verify that the KafkaMirroringSink.submit() method was called
-        verify(consumer.kafkaMirroringSink, times(1)).submit(record.value());
+        verify(consumer.kafkaMirroringSink, times(1)).submit(request);
     }
 
 
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index 3335a01..5d91ced 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -39,7 +39,6 @@ dependencies {
 
     provided  group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
 
-    testImplementation 'org.slf4j:slf4j-api:2.0.5'
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
     testImplementation('org.mockito:mockito-inline:5.1.1')
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index 6d2d543..3877dfc 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -180,7 +180,57 @@ import java.util.Properties;
   }
 
   @Test
-  public void testRetryQueue() throws Exception {
+  public void testRetryQueueSolrDown() throws Exception {
+    solrCluster2.getJettySolrRunner(0).stop();
+
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.nanoTime()));
+    doc.addField("text", "some test");
+
+    client.add(doc);
+
+    SolrInputDocument doc2 = new SolrInputDocument();
+    doc2.addField("id", String.valueOf(System.nanoTime()));
+    doc2.addField("text", "some test");
+
+    client.add(doc2);
+
+    SolrInputDocument doc3 = new SolrInputDocument();
+    doc3.addField("id", String.valueOf(System.nanoTime()));
+    doc3.addField("text", "some test");
+
+    client.add(doc3);
+
+    client.commit(COLLECTION);
+
+    System.out.println("Sent producer record");
+
+    Thread.sleep(15000);
+
+    solrCluster2.getJettySolrRunner(0).start();
+    Thread.sleep(10000);
+
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 200; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 3) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+
+  }
+
+    @Test
+  public void testRetryQueueZKDown() throws Exception {
     Path zkDir = zkTestServer2.getZkDir();
     int zkPort = zkTestServer2.getPort();
     zkTestServer2.shutdown();
@@ -208,14 +258,16 @@ import java.util.Properties;
 
     System.out.println("Sent producer record");
 
-    Thread.sleep(5000);
+    Thread.sleep(15000);
 
     zkTestServer2 = new ZkTestServer(zkDir, zkPort);
     zkTestServer2.run(false);
 
+    Thread.sleep(10000);
+
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 200; i++) {
+    for (int i = 0; i < 15; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("*:*"));
@@ -226,8 +278,6 @@ import java.util.Properties;
       }
     }
 
-    System.out.println("Closed producer");
-
     assertTrue("results=" + results, foundUpdates);
     System.out.println("Rest: " + results);
 

Reply via email to