This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new e0193cf  Add dead letter queue functionality on the producer and 
consumer side (#70)
e0193cf is described below

commit e0193cf666c70f3358256c0b2979c636cb90bf09
Author: Marcin Górski <[email protected]>
AuthorDate: Tue Oct 17 11:18:50 2023 +0200

    Add dead letter queue functionality on the producer and consumer side (#70)
---
 .../solr/crossdc/common/KafkaCrossDcConf.java      |  9 ++-
 .../solr/crossdc/common/KafkaMirroringSink.java    | 64 +++++++++++++++++++++-
 .../solr/crossdc/common/RequestMirroringSink.java  |  2 +
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 10 +++-
 .../processor/KafkaRequestMirroringHandler.java    | 12 +++-
 .../crossdc/KafkaRequestMirroringHandlerTest.java  | 39 +++++++++++++
 .../admin/MirroringCollectionsHandlerTest.java     |  2 -
 7 files changed, 130 insertions(+), 8 deletions(-)

diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 3147643..eaafb0f 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -65,6 +65,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   public static final String TOPIC_NAME = "topicName";
 
+  public static final String DLQ_TOPIC_NAME = "dlqTopicName";
+
+  public static final String MAX_ATTEMPTS = "maxAttempts";
+
   public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
 
   public static final String BATCH_SIZE_BYTES = "batchSizeBytes";
@@ -130,7 +134,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   static {
     List<ConfigProperty> configProperties = new ArrayList<>(
-        List.of(new ConfigProperty(TOPIC_NAME), new 
ConfigProperty(BOOTSTRAP_SERVERS),
+        List.of(new ConfigProperty(TOPIC_NAME),
+            new ConfigProperty(DLQ_TOPIC_NAME),
+            new ConfigProperty(MAX_ATTEMPTS, "3"),
+            new ConfigProperty(BOOTSTRAP_SERVERS),
             new ConfigProperty(BATCH_SIZE_BYTES, DEFAULT_BATCH_SIZE_BYTES),
             new ConfigProperty(BUFFER_MEMORY_BYTES, 
DEFAULT_BUFFER_MEMORY_BYTES),
             new ConfigProperty(LINGER_MS, DEFAULT_LINGER_MS),
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 728d116..952e930 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -16,10 +16,14 @@
  */
 package org.apache.solr.crossdc.common;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +31,8 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -38,15 +44,49 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
 
     private final KafkaCrossDcConf conf;
     private final Producer<String, MirroredSolrRequest> producer;
+    private final KafkaConsumer<String,MirroredSolrRequest> consumer;
+    private final String mainTopic;
+    private final String dlqTopic;
 
     public KafkaMirroringSink(final KafkaCrossDcConf conf) {
         // Create Kafka Mirroring Sink
         this.conf = conf;
         this.producer = initProducer();
+        this.consumer = initConsumer();
+        this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0];
+        this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME);
+
+        checkTopicsAvailability();
     }
 
     @Override
     public void submit(MirroredSolrRequest request) throws MirroringException {
+        this.submitRequest(request, mainTopic);
+    }
+
+    @Override
+    public void submitToDlq(MirroredSolrRequest request) throws 
MirroringException {
+        if (dlqTopic != null) {
+            this.submitRequest(request, dlqTopic);
+        } else {
+            if (log.isInfoEnabled()) {
+                log.info("- no DLQ, dropping failed {}", request);
+            }
+        }
+    }
+
+    private void checkTopicsAvailability() {
+        final Map<String, List<PartitionInfo>> topics = 
this.consumer.listTopics();
+
+        if (mainTopic != null && !topics.containsKey(mainTopic)) {
+            throw new RuntimeException("Main topic " + mainTopic + " is not 
available");
+        }
+        if (dlqTopic != null && !topics.containsKey(dlqTopic)) {
+            throw new RuntimeException("DLQ topic " + dlqTopic + " is not 
available");
+        }
+    }
+
+    private void submitRequest(MirroredSolrRequest request, String topicName) 
throws MirroringException {
         if (log.isDebugEnabled()) {
             log.debug("About to submit a MirroredSolrRequest");
         }
@@ -56,7 +96,7 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
         // Create Producer record
         try {
 
-            producer.send(new 
ProducerRecord<>(conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0], request), 
(metadata, exception) -> {
+            producer.send(new ProducerRecord<>(topicName, request), (metadata, 
exception) -> {
                 if (exception != null) {
                     log.error("Failed adding update to CrossDC queue! 
request=" + request.getSolrRequest(), exception);
                 }
@@ -73,7 +113,7 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
         } catch (Exception e) {
             // We are intentionally catching all exceptions, the expected 
exception form this function is {@link MirroringException}
             String message = "Unable to enqueue request " + request + ", 
configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
-                " and configured max delivery timeout in ms is " + 
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
+                    " and configured max delivery timeout in ms is " + 
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
             log.error(message, e);
             throw new MirroringException(message, e);
         }
@@ -130,6 +170,26 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
         return producer;
     }
 
+    private KafkaConsumer<String, MirroredSolrRequest> initConsumer() {
+        final Properties kafkaConsumerProperties = new Properties();
+
+        kafkaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+        kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, 
conf.get(KafkaCrossDcConf.GROUP_ID));
+        kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+        
kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+        kafkaConsumerProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
+        kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false);
+        kafkaConsumerProperties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+        kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+        kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+        
kafkaConsumerProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+        kafkaConsumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
+        kafkaConsumerProperties.putAll(conf.getAdditionalProperties());
+
+        return new KafkaConsumer<>(kafkaConsumerProperties, new 
StringDeserializer(), new MirroredSolrRequestSerializer());
+    }
+
     private void slowSubmitAction(Object request, long elapsedTimeMillis) {
         log.warn("Enqueuing the request to Kafka took more than {} millis. 
enqueueElapsedTime={}",
                 conf.get(KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS),
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
index e8b2c69..efd1e7e 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
@@ -26,4 +26,6 @@ public interface RequestMirroringSink {
      * @throws MirroringException Implementations may throw an exception
      */
     void submit(final MirroredSolrRequest request) throws MirroringException;
+
+    void submitToDlq(final MirroredSolrRequest request) throws 
MirroringException;
 }
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index a221edc..63939ff 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -44,6 +44,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
   private final String[] topicNames;
+  private final int maxAttempts;
   private final SolrMessageProcessor messageProcessor;
 
   private final CloudSolrClient solrClient;
@@ -68,6 +69,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch 
startLatch) {
 
     this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
+    this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
     this.startLatch = startLatch;
     final Properties kafkaConsumerProps = new Properties();
 
@@ -329,7 +331,13 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
           log.trace("result=failed-resubmit");
         }
         metrics.counter("failed-resubmit").inc();
-        kafkaMirroringSink.submit(result.newItem());
+        final int attempt = record.value().getAttempt();
+        if (attempt > this.maxAttempts) {
+          log.info("Sending message to dead letter queue because of max 
attempts limit with current value = {}", attempt);
+          kafkaMirroringSink.submitToDlq(result.newItem());
+        } else {
+          kafkaMirroringSink.submit(result.newItem());
+        }
         break;
       case HANDLED:
         // no-op
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
index b618f01..9141a76 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -19,7 +19,6 @@ package org.apache.solr.update.processor;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
 import org.apache.solr.crossdc.common.MirroringException;
-import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +48,15 @@ public class KafkaRequestMirroringHandler implements 
RequestMirroringHandler {
             log.trace("submit update to sink docs={}, deletes={}, params={}", 
request.getDocuments(), request.getDeleteById(), request.getParams());
         }
         // TODO: Enforce external version constraint for consistent update 
replication (cross-cluster)
-        sink.submit(new MirroredSolrRequest(MirroredSolrRequest.Type.UPDATE, 
1, request, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis())));
+        final MirroredSolrRequest mirroredRequest = new 
MirroredSolrRequest(MirroredSolrRequest.Type.UPDATE, 1, request, 
TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
+        try {
+            sink.submit(mirroredRequest);
+        } catch (MirroringException exception) {
+            if (log.isInfoEnabled()) {
+                log.info("Sending message to dead letter queue");
+            }
+            sink.submitToDlq(mirroredRequest);
+            throw new MirroringException(exception);
+        }
     }
 }
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
new file mode 100644
index 0000000..1b2dd12
--- /dev/null
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
@@ -0,0 +1,39 @@
+package org.apache.solr.crossdc;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroringException;
+import org.apache.solr.update.processor.KafkaRequestMirroringHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRequestMirroringHandlerTest {
+
+    @Mock
+    KafkaMirroringSink kafkaMirroringSink;
+
+    @Test
+    public void testCheckDeadLetterQueueMessageExecution() throws 
MirroringException {
+        
doThrow(MirroringException.class).when(kafkaMirroringSink).submit(any());
+
+        final UpdateRequest updateRequest = new UpdateRequest();
+        final KafkaRequestMirroringHandler kafkaRequestMirroringHandler = new 
KafkaRequestMirroringHandler(kafkaMirroringSink);
+
+        try {
+            kafkaRequestMirroringHandler.mirror(updateRequest);
+        } catch (MirroringException exception) {
+            // do nothing
+        }
+
+        verify(kafkaMirroringSink, times(1)).submitToDlq(any());
+    }
+
+}
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/handler/admin/MirroringCollectionsHandlerTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/handler/admin/MirroringCollectionsHandlerTest.java
index 4bedc51..4f333ab 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/handler/admin/MirroringCollectionsHandlerTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/handler/admin/MirroringCollectionsHandlerTest.java
@@ -2,7 +2,6 @@ package org.apache.solr.handler.admin;
 
 import org.apache.commons.io.IOUtils;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import org.apache.commons.io.IOUtils;
 import org.apache.lucene.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
 import org.apache.solr.SolrTestCaseJ4;
@@ -14,7 +13,6 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrXmlConfig;
-import org.apache.solr.crossdc.common.CrossDcConf;
 import org.apache.solr.crossdc.SolrKafkaTestsIgnoredThreadsFilter;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;

Reply via email to