This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new d952e0948ca SOLR-18063: Fix NPE in CrossDC Consumer when resubmitting 
requests to dead-letter queue. (#4045)
d952e0948ca is described below

commit d952e0948ca469998ceadf59a96a90e6249a918a
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Mon Jan 26 17:38:01 2026 +0100

    SOLR-18063: Fix NPE in CrossDC Consumer when resubmitting requests to 
dead-letter queue. (#4045)
---
 changelog/unreleased/solr-18063.yml                |  9 +++++
 .../manager/consumer/KafkaCrossDcConsumer.java     | 38 +++++++++++++++++-----
 .../messageprocessor/SolrMessageProcessor.java     | 29 ++++++++++++++---
 .../manager/consumer/KafkaCrossDcConsumerTest.java |  8 ++---
 .../messageprocessor/SolrMessageProcessorTest.java |  6 ++--
 .../apache/solr/crossdc/common/IQueueHandler.java  | 12 ++-----
 6 files changed, 73 insertions(+), 29 deletions(-)

diff --git a/changelog/unreleased/solr-18063.yml 
b/changelog/unreleased/solr-18063.yml
new file mode 100644
index 00000000000..914ddf2bfb6
--- /dev/null
+++ b/changelog/unreleased/solr-18063.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: SOLR-18063 -  NPE when resubmitting to DLQ
+type: fixed
+authors:
+  - name: Andrzej Bialecki
+    nick: ab
+links:
+  - name: SOLR-18063
+    url: https://issues.apache.org/jira/browse/SOLR-18063
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index bf1ba691f26..dd7aabef25a 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -527,15 +527,24 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
           log.trace("result=failed-resubmit");
         }
         final int attempt = item.getAttempt();
-        if (attempt > this.maxAttempts) {
-          log.info(
-              "Sending message to dead letter queue because of max attempts 
limit with current value = {}",
-              attempt);
-          kafkaMirroringSink.submitToDlq(item);
-          metrics.counter(MetricRegistry.name(type.name(), 
"failed-dlq")).inc();
-        } else {
-          kafkaMirroringSink.submit(item);
-          metrics.counter(MetricRegistry.name(type.name(), 
"failed-resubmit")).inc();
+        final boolean dlq = attempt > this.maxAttempts;
+        try {
+          if (dlq) {
+            log.info(
+                "Sending message to dead letter queue because of max attempts 
limit with current value = {}",
+                attempt);
+            kafkaMirroringSink.submitToDlq(item);
+            metrics.counter(MetricRegistry.name(type.name(), 
"failed-dlq")).inc();
+          } else {
+            kafkaMirroringSink.submit(item);
+            metrics.counter(MetricRegistry.name(type.name(), 
"failed-resubmit")).inc();
+          }
+        } catch (Exception e) {
+          log.error(
+              "Failed to {}, msg={}",
+              dlq ? "send message to dead-letter queue" : "resubmit message 
for retry",
+              item,
+              e);
         }
         break;
       case HANDLED:
@@ -556,6 +565,17 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
             "Unexpected response while processing request. We never expect 
{}.", result.status());
         metrics.counter(MetricRegistry.name(type.name(), 
"failed-retry")).inc();
         break;
+      case FAILED_NO_RETRY:
+        if (log.isDebugEnabled()) {
+          log.debug("Failed no-retry: sending message to dead-letter queue");
+        }
+        try {
+          kafkaMirroringSink.submitToDlq(item);
+        } catch (Exception e) {
+          log.error("Failed to send message to dead-letter queue, msg={}", 
item, e);
+        }
+        metrics.counter(MetricRegistry.name(type.name(), 
"failed-no-retry")).inc();
+        break;
       default:
         if (log.isTraceEnabled()) {
           log.trace("result=no matching case");
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
index f0ac2b182a2..29f83052fe1 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
@@ -31,6 +31,8 @@ import org.apache.solr.client.solrj.response.SolrResponseBase;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -108,7 +110,7 @@ public class SolrMessageProcessor extends MessageProcessor
     try {
       prepareIfUpdateRequest(request);
       logRequest(request);
-      result = processMirroredSolrRequest(request, 
mirroredSolrRequest.getType());
+      result = processMirroredSolrRequest(mirroredSolrRequest);
     } catch (Exception e) {
       result = handleException(mirroredSolrRequest, e);
     }
@@ -124,7 +126,7 @@ public class SolrMessageProcessor extends MessageProcessor
     logIf4xxException(solrException);
     if (!isRetryable(e)) {
       log.error("Non retryable exception processing Solr update", e);
-      return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
+      return new Result<>(ResultStatus.FAILED_NO_RETRY, e, 
mirroredSolrRequest);
     } else {
       logFailure(mirroredSolrRequest, e, solrException);
       mirroredSolrRequest.setAttempt(mirroredSolrRequest.getAttempt() + 1);
@@ -188,13 +190,32 @@ public class SolrMessageProcessor extends MessageProcessor
 
   /** Process the SolrRequest. If not, this method throws an exception. */
   private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
-      SolrRequest<?> request, MirroredSolrRequest.Type type) throws Exception {
+      MirroredSolrRequest<?> mirroredSolrRequest) throws Exception {
+    final SolrRequest<?> request = mirroredSolrRequest.getSolrRequest();
+    final MirroredSolrRequest.Type type = mirroredSolrRequest.getType();
     if (log.isDebugEnabled()) {
       log.debug(
           "Sending request to Solr at ZK address={} with params {}",
           
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
           request.getParams());
     }
+    // short-circuit requests to nonexistent or not updatable collections
+    if (type == MirroredSolrRequest.Type.UPDATE && request.getCollection() != 
null) {
+      ClusterState clusterState = clientSupplier.get().getClusterState();
+      DocCollection docCollection = 
clusterState.getCollectionOrNull(request.getCollection());
+      if (docCollection == null
+          || docCollection.isReadOnly()
+          || docCollection.getActiveSlices().isEmpty()) {
+        if (log.isInfoEnabled()) {
+          log.warn(
+              "Skipping update request to nonexistent / not updatable 
collection {}",
+              request.getCollection());
+        }
+        metrics.counter(MetricRegistry.name(type.name(), 
"invalid-collection")).inc();
+        return new Result<>(ResultStatus.FAILED_NO_RETRY, mirroredSolrRequest);
+      }
+    }
+
     Result<MirroredSolrRequest<?>> result;
     SolrResponseBase response;
     Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), 
"outputTime")).time();
@@ -222,7 +243,7 @@ public class SolrMessageProcessor extends MessageProcessor
           request.getParams(),
           status);
     }
-    result = new Result<>(ResultStatus.HANDLED);
+    result = new Result<>(ResultStatus.HANDLED, mirroredSolrRequest);
     return result;
   }
 
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
index 328a629a9fe..9f8e904a72b 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
@@ -297,7 +297,7 @@ public class KafkaCrossDcConsumerTest {
   public void testHandleValidMirroredSolrRequest() {
     KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer = 
mock(KafkaConsumer.class);
     KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
-    doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
+    doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, 
null))
         .when(messageProcessorMock)
         .handleItem(any());
     SolrInputDocument doc = new SolrInputDocument();
@@ -336,7 +336,7 @@ public class KafkaCrossDcConsumerTest {
   public void testHandleValidAdminRequest() throws Exception {
     KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer = 
mock(KafkaConsumer.class);
     KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
-    doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
+    doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, 
null))
         .when(messageProcessorMock)
         .handleItem(any());
     CollectionAdminRequest.Create create =
@@ -417,7 +417,7 @@ public class KafkaCrossDcConsumerTest {
             CrossDcConf.COLLAPSE_UPDATES, collapseUpdates.name(),
             CrossDcConf.MAX_COLLAPSE_RECORDS, 
String.valueOf(maxCollapseRecords));
     KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
-    doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
+    doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, 
null))
         .when(messageProcessorMock)
         .handleItem(any());
     List<ConsumerRecord<String, MirroredSolrRequest<?>>> records = new 
ArrayList<>();
@@ -452,7 +452,7 @@ public class KafkaCrossDcConsumerTest {
   public void testHandleInvalidMirroredSolrRequest() {
     KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer = 
mock(KafkaConsumer.class);
     SolrMessageProcessor mockSolrMessageProcessor = 
mock(SolrMessageProcessor.class);
-    doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
+    doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, 
null))
         .when(mockSolrMessageProcessor)
         .handleItem(any());
     KafkaCrossDcConsumer spyConsumer =
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
index 2a1b552ab26..6110cfbbbad 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
@@ -18,7 +18,7 @@ package org.apache.solr.crossdc.manager.messageprocessor;
 
 import static org.apache.solr.SolrTestCaseJ4.assumeWorkingMockito;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -76,6 +76,7 @@ public class SolrMessageProcessorTest {
         solrMessageProcessor.handleItem(mirroredSolrRequest);
 
     assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+    assertNotNull(result.getItem());
   }
 
   /** Should handle MirroredSolrRequest and return a failed result with 
resubmit */
@@ -92,6 +93,7 @@ public class SolrMessageProcessorTest {
         solrMessageProcessor.handleItem(mirroredSolrRequest);
 
     assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+    assertNotNull(result.getItem());
     assertEquals(mirroredSolrRequest, result.getItem());
   }
 
@@ -111,7 +113,7 @@ public class SolrMessageProcessorTest {
         solrMessageProcessor.handleItem(mirroredSolrRequest);
 
     assertEquals(IQueueHandler.ResultStatus.HANDLED, result.status());
-    assertNull(result.getItem());
+    assertNotNull(result.getItem());
   }
 
   /** Should connect to Solr if not connected and process the request */
diff --git 
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java
 
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java
index f4fa5f7ae82..145630deee7 100644
--- 
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java
+++ 
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java
@@ -39,16 +39,8 @@ public interface IQueueHandler<T> {
     private final Throwable _throwable;
     private final T _item;
 
-    public Result(final ResultStatus status) {
-      _status = status;
-      _throwable = null;
-      _item = null;
-    }
-
-    public Result(final ResultStatus status, final Throwable throwable) {
-      _status = status;
-      _throwable = throwable;
-      _item = null;
+    public Result(final ResultStatus status, final T newItem) {
+      this(status, null, newItem);
     }
 
     public Result(final ResultStatus status, final Throwable throwable, final 
T newItem) {

Reply via email to