This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new e0193cf Add dead letter queue functionality on the producer and
consumer side (#70)
e0193cf is described below
commit e0193cf666c70f3358256c0b2979c636cb90bf09
Author: Marcin Górski <[email protected]>
AuthorDate: Tue Oct 17 11:18:50 2023 +0200
Add dead letter queue functionality on the producer and consumer side (#70)
---
.../solr/crossdc/common/KafkaCrossDcConf.java | 9 ++-
.../solr/crossdc/common/KafkaMirroringSink.java | 64 +++++++++++++++++++++-
.../solr/crossdc/common/RequestMirroringSink.java | 2 +
.../crossdc/consumer/KafkaCrossDcConsumer.java | 10 +++-
.../processor/KafkaRequestMirroringHandler.java | 12 +++-
.../crossdc/KafkaRequestMirroringHandlerTest.java | 39 +++++++++++++
.../admin/MirroringCollectionsHandlerTest.java | 2 -
7 files changed, 130 insertions(+), 8 deletions(-)
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 3147643..eaafb0f 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -65,6 +65,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String TOPIC_NAME = "topicName";
+ public static final String DLQ_TOPIC_NAME = "dlqTopicName";
+
+ public static final String MAX_ATTEMPTS = "maxAttempts";
+
public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
public static final String BATCH_SIZE_BYTES = "batchSizeBytes";
@@ -130,7 +134,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
static {
List<ConfigProperty> configProperties = new ArrayList<>(
- List.of(new ConfigProperty(TOPIC_NAME), new
ConfigProperty(BOOTSTRAP_SERVERS),
+ List.of(new ConfigProperty(TOPIC_NAME),
+ new ConfigProperty(DLQ_TOPIC_NAME),
+ new ConfigProperty(MAX_ATTEMPTS, "3"),
+ new ConfigProperty(BOOTSTRAP_SERVERS),
new ConfigProperty(BATCH_SIZE_BYTES, DEFAULT_BATCH_SIZE_BYTES),
new ConfigProperty(BUFFER_MEMORY_BYTES,
DEFAULT_BUFFER_MEMORY_BYTES),
new ConfigProperty(LINGER_MS, DEFAULT_LINGER_MS),
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 728d116..952e930 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -16,10 +16,14 @@
*/
package org.apache.solr.crossdc.common;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +31,8 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -38,15 +44,49 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
private final KafkaCrossDcConf conf;
private final Producer<String, MirroredSolrRequest> producer;
+ private final KafkaConsumer<String,MirroredSolrRequest> consumer;
+ private final String mainTopic;
+ private final String dlqTopic;
public KafkaMirroringSink(final KafkaCrossDcConf conf) {
// Create Kafka Mirroring Sink
this.conf = conf;
this.producer = initProducer();
+ this.consumer = initConsumer();
+ this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0];
+ this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME);
+
+ checkTopicsAvailability();
}
@Override
public void submit(MirroredSolrRequest request) throws MirroringException {
+ this.submitRequest(request, mainTopic);
+ }
+
+ @Override
+ public void submitToDlq(MirroredSolrRequest request) throws
MirroringException {
+ if (dlqTopic != null) {
+ this.submitRequest(request, dlqTopic);
+ } else {
+ if (log.isInfoEnabled()) {
+ log.info("- no DLQ, dropping failed {}", request);
+ }
+ }
+ }
+
+ private void checkTopicsAvailability() {
+ final Map<String, List<PartitionInfo>> topics =
this.consumer.listTopics();
+
+ if (mainTopic != null && !topics.containsKey(mainTopic)) {
+ throw new RuntimeException("Main topic " + mainTopic + " is not
available");
+ }
+ if (dlqTopic != null && !topics.containsKey(dlqTopic)) {
+ throw new RuntimeException("DLQ topic " + dlqTopic + " is not
available");
+ }
+ }
+
+ private void submitRequest(MirroredSolrRequest request, String topicName)
throws MirroringException {
if (log.isDebugEnabled()) {
log.debug("About to submit a MirroredSolrRequest");
}
@@ -56,7 +96,7 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
// Create Producer record
try {
- producer.send(new
ProducerRecord<>(conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0], request),
(metadata, exception) -> {
+ producer.send(new ProducerRecord<>(topicName, request), (metadata,
exception) -> {
if (exception != null) {
log.error("Failed adding update to CrossDC queue!
request=" + request.getSolrRequest(), exception);
}
@@ -73,7 +113,7 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
} catch (Exception e) {
// We are intentionally catching all exceptions, the expected
exception form this function is {@link MirroringException}
String message = "Unable to enqueue request " + request + ",
configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
- " and configured max delivery timeout in ms is " +
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
+ " and configured max delivery timeout in ms is " +
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
log.error(message, e);
throw new MirroringException(message, e);
}
@@ -130,6 +170,26 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
return producer;
}
+ private KafkaConsumer<String, MirroredSolrRequest> initConsumer() {
+ final Properties kafkaConsumerProperties = new Properties();
+
+ kafkaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
conf.get(KafkaCrossDcConf.GROUP_ID));
+ kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+
kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+ kafkaConsumerProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
+ kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
+ kafkaConsumerProperties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+ kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+ kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+
kafkaConsumerProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+ kafkaConsumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
+ kafkaConsumerProperties.putAll(conf.getAdditionalProperties());
+
+ return new KafkaConsumer<>(kafkaConsumerProperties, new
StringDeserializer(), new MirroredSolrRequestSerializer());
+ }
+
private void slowSubmitAction(Object request, long elapsedTimeMillis) {
log.warn("Enqueuing the request to Kafka took more than {} millis.
enqueueElapsedTime={}",
conf.get(KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS),
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
index e8b2c69..efd1e7e 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
@@ -26,4 +26,6 @@ public interface RequestMirroringSink {
* @throws MirroringException Implementations may throw an exception
*/
void submit(final MirroredSolrRequest request) throws MirroringException;
+
+ void submitToDlq(final MirroredSolrRequest request) throws
MirroringException;
}
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index a221edc..63939ff 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -44,6 +44,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
private final String[] topicNames;
+ private final int maxAttempts;
private final SolrMessageProcessor messageProcessor;
private final CloudSolrClient solrClient;
@@ -68,6 +69,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch
startLatch) {
this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
+ this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
this.startLatch = startLatch;
final Properties kafkaConsumerProps = new Properties();
@@ -329,7 +331,13 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.trace("result=failed-resubmit");
}
metrics.counter("failed-resubmit").inc();
- kafkaMirroringSink.submit(result.newItem());
+ final int attempt = record.value().getAttempt();
+ if (attempt > this.maxAttempts) {
+ log.info("Sending message to dead letter queue because of max
attempts limit with current value = {}", attempt);
+ kafkaMirroringSink.submitToDlq(result.newItem());
+ } else {
+ kafkaMirroringSink.submit(result.newItem());
+ }
break;
case HANDLED:
// no-op
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
index b618f01..9141a76 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -19,7 +19,6 @@ package org.apache.solr.update.processor;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
import org.apache.solr.crossdc.common.MirroringException;
-import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +48,15 @@ public class KafkaRequestMirroringHandler implements
RequestMirroringHandler {
log.trace("submit update to sink docs={}, deletes={}, params={}",
request.getDocuments(), request.getDeleteById(), request.getParams());
}
// TODO: Enforce external version constraint for consistent update
replication (cross-cluster)
- sink.submit(new MirroredSolrRequest(MirroredSolrRequest.Type.UPDATE,
1, request, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis())));
+ final MirroredSolrRequest mirroredRequest = new
MirroredSolrRequest(MirroredSolrRequest.Type.UPDATE, 1, request,
TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
+ try {
+ sink.submit(mirroredRequest);
+ } catch (MirroringException exception) {
+ if (log.isInfoEnabled()) {
+ log.info("Sending message to dead letter queue");
+ }
+ sink.submitToDlq(mirroredRequest);
+ throw new MirroringException(exception);
+ }
}
}
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
new file mode 100644
index 0000000..1b2dd12
--- /dev/null
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
@@ -0,0 +1,39 @@
+package org.apache.solr.crossdc;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroringException;
+import org.apache.solr.update.processor.KafkaRequestMirroringHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRequestMirroringHandlerTest {
+
+ @Mock
+ KafkaMirroringSink kafkaMirroringSink;
+
+ @Test
+ public void testCheckDeadLetterQueueMessageExecution() throws
MirroringException {
+
doThrow(MirroringException.class).when(kafkaMirroringSink).submit(any());
+
+ final UpdateRequest updateRequest = new UpdateRequest();
+ final KafkaRequestMirroringHandler kafkaRequestMirroringHandler = new
KafkaRequestMirroringHandler(kafkaMirroringSink);
+
+ try {
+ kafkaRequestMirroringHandler.mirror(updateRequest);
+ } catch (MirroringException exception) {
+ // do nothing
+ }
+
+ verify(kafkaMirroringSink, times(1)).submitToDlq(any());
+ }
+
+}
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/handler/admin/MirroringCollectionsHandlerTest.java
b/crossdc-producer/src/test/java/org/apache/solr/handler/admin/MirroringCollectionsHandlerTest.java
index 4bedc51..4f333ab 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/handler/admin/MirroringCollectionsHandlerTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/handler/admin/MirroringCollectionsHandlerTest.java
@@ -2,7 +2,6 @@ package org.apache.solr.handler.admin;
import org.apache.commons.io.IOUtils;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import org.apache.commons.io.IOUtils;
import org.apache.lucene.util.QuickPatchThreadsFilter;
import org.apache.solr.SolrIgnoredThreadsFilter;
import org.apache.solr.SolrTestCaseJ4;
@@ -14,7 +13,6 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrXmlConfig;
-import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.SolrKafkaTestsIgnoredThreadsFilter;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;