This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_10x by this push:
new ebc7f85eed3 SOLR-18063: Fix NPE in CrossDC Consumer when resubmitting
requests to DLQ (cherry-pick from #4045) (#4085)
ebc7f85eed3 is described below
commit ebc7f85eed31819c823eba694addb644ac169e8a
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Thu Jan 29 14:33:49 2026 +0100
SOLR-18063: Fix NPE in CrossDC Consumer when resubmitting requests to DLQ
(cherry-pick from #4045) (#4085)
---
changelog/unreleased/solr-18063.yml | 9 +++++
.../manager/consumer/KafkaCrossDcConsumer.java | 38 +++++++++++++++++-----
.../messageprocessor/SolrMessageProcessor.java | 29 ++++++++++++++---
.../manager/consumer/KafkaCrossDcConsumerTest.java | 8 ++---
.../messageprocessor/SolrMessageProcessorTest.java | 6 ++--
.../apache/solr/crossdc/common/IQueueHandler.java | 12 ++-----
6 files changed, 73 insertions(+), 29 deletions(-)
diff --git a/changelog/unreleased/solr-18063.yml
b/changelog/unreleased/solr-18063.yml
new file mode 100644
index 00000000000..914ddf2bfb6
--- /dev/null
+++ b/changelog/unreleased/solr-18063.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: SOLR-18063 - NPE when resubmitting to DLQ
+type: fixed
+authors:
+ - name: Andrzej Bialecki
+ nick: ab
+links:
+ - name: SOLR-18063
+ url: https://issues.apache.org/jira/browse/SOLR-18063
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index bf1ba691f26..dd7aabef25a 100644
---
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -527,15 +527,24 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.trace("result=failed-resubmit");
}
final int attempt = item.getAttempt();
- if (attempt > this.maxAttempts) {
- log.info(
- "Sending message to dead letter queue because of max attempts
limit with current value = {}",
- attempt);
- kafkaMirroringSink.submitToDlq(item);
- metrics.counter(MetricRegistry.name(type.name(),
"failed-dlq")).inc();
- } else {
- kafkaMirroringSink.submit(item);
- metrics.counter(MetricRegistry.name(type.name(),
"failed-resubmit")).inc();
+ final boolean dlq = attempt > this.maxAttempts;
+ try {
+ if (dlq) {
+ log.info(
+ "Sending message to dead letter queue because of max attempts
limit with current value = {}",
+ attempt);
+ kafkaMirroringSink.submitToDlq(item);
+ metrics.counter(MetricRegistry.name(type.name(),
"failed-dlq")).inc();
+ } else {
+ kafkaMirroringSink.submit(item);
+ metrics.counter(MetricRegistry.name(type.name(),
"failed-resubmit")).inc();
+ }
+ } catch (Exception e) {
+ log.error(
+ "Failed to {}, msg={}",
+ dlq ? "send message to dead-letter queue" : "resubmit message
for retry",
+ item,
+ e);
}
break;
case HANDLED:
@@ -556,6 +565,17 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
"Unexpected response while processing request. We never expect
{}.", result.status());
metrics.counter(MetricRegistry.name(type.name(),
"failed-retry")).inc();
break;
+ case FAILED_NO_RETRY:
+ if (log.isDebugEnabled()) {
+ log.debug("Failed no-retry: sending message to dead-letter queue");
+ }
+ try {
+ kafkaMirroringSink.submitToDlq(item);
+ } catch (Exception e) {
+ log.error("Failed to send message to dead-letter queue, msg={}",
item, e);
+ }
+ metrics.counter(MetricRegistry.name(type.name(),
"failed-no-retry")).inc();
+ break;
default:
if (log.isTraceEnabled()) {
log.trace("result=no matching case");
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
index f0ac2b182a2..29f83052fe1 100644
---
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
@@ -31,6 +31,8 @@ import org.apache.solr.client.solrj.response.SolrResponseBase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -108,7 +110,7 @@ public class SolrMessageProcessor extends MessageProcessor
try {
prepareIfUpdateRequest(request);
logRequest(request);
- result = processMirroredSolrRequest(request,
mirroredSolrRequest.getType());
+ result = processMirroredSolrRequest(mirroredSolrRequest);
} catch (Exception e) {
result = handleException(mirroredSolrRequest, e);
}
@@ -124,7 +126,7 @@ public class SolrMessageProcessor extends MessageProcessor
logIf4xxException(solrException);
if (!isRetryable(e)) {
log.error("Non retryable exception processing Solr update", e);
- return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
+ return new Result<>(ResultStatus.FAILED_NO_RETRY, e,
mirroredSolrRequest);
} else {
logFailure(mirroredSolrRequest, e, solrException);
mirroredSolrRequest.setAttempt(mirroredSolrRequest.getAttempt() + 1);
@@ -188,13 +190,32 @@ public class SolrMessageProcessor extends MessageProcessor
/** Process the SolrRequest. If not, this method throws an exception. */
private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
- SolrRequest<?> request, MirroredSolrRequest.Type type) throws Exception {
+ MirroredSolrRequest<?> mirroredSolrRequest) throws Exception {
+ final SolrRequest<?> request = mirroredSolrRequest.getSolrRequest();
+ final MirroredSolrRequest.Type type = mirroredSolrRequest.getType();
if (log.isDebugEnabled()) {
log.debug(
"Sending request to Solr at ZK address={} with params {}",
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
request.getParams());
}
+ // short-circuit requests to nonexistent or not updatable collections
+ if (type == MirroredSolrRequest.Type.UPDATE && request.getCollection() !=
null) {
+ ClusterState clusterState = clientSupplier.get().getClusterState();
+ DocCollection docCollection =
clusterState.getCollectionOrNull(request.getCollection());
+ if (docCollection == null
+ || docCollection.isReadOnly()
+ || docCollection.getActiveSlices().isEmpty()) {
+ if (log.isInfoEnabled()) {
+ log.warn(
+ "Skipping update request to nonexistent / not updatable
collection {}",
+ request.getCollection());
+ }
+ metrics.counter(MetricRegistry.name(type.name(),
"invalid-collection")).inc();
+ return new Result<>(ResultStatus.FAILED_NO_RETRY, mirroredSolrRequest);
+ }
+ }
+
Result<MirroredSolrRequest<?>> result;
SolrResponseBase response;
Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(),
"outputTime")).time();
@@ -222,7 +243,7 @@ public class SolrMessageProcessor extends MessageProcessor
request.getParams(),
status);
}
- result = new Result<>(ResultStatus.HANDLED);
+ result = new Result<>(ResultStatus.HANDLED, mirroredSolrRequest);
return result;
}
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
index 328a629a9fe..9f8e904a72b 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
@@ -297,7 +297,7 @@ public class KafkaCrossDcConsumerTest {
public void testHandleValidMirroredSolrRequest() {
KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer =
mock(KafkaConsumer.class);
KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
- doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
+ doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED,
null))
.when(messageProcessorMock)
.handleItem(any());
SolrInputDocument doc = new SolrInputDocument();
@@ -336,7 +336,7 @@ public class KafkaCrossDcConsumerTest {
public void testHandleValidAdminRequest() throws Exception {
KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer =
mock(KafkaConsumer.class);
KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
- doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
+ doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED,
null))
.when(messageProcessorMock)
.handleItem(any());
CollectionAdminRequest.Create create =
@@ -417,7 +417,7 @@ public class KafkaCrossDcConsumerTest {
CrossDcConf.COLLAPSE_UPDATES, collapseUpdates.name(),
CrossDcConf.MAX_COLLAPSE_RECORDS,
String.valueOf(maxCollapseRecords));
KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
- doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
+ doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED,
null))
.when(messageProcessorMock)
.handleItem(any());
List<ConsumerRecord<String, MirroredSolrRequest<?>>> records = new
ArrayList<>();
@@ -452,7 +452,7 @@ public class KafkaCrossDcConsumerTest {
public void testHandleInvalidMirroredSolrRequest() {
KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer =
mock(KafkaConsumer.class);
SolrMessageProcessor mockSolrMessageProcessor =
mock(SolrMessageProcessor.class);
- doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
+ doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED,
null))
.when(mockSolrMessageProcessor)
.handleItem(any());
KafkaCrossDcConsumer spyConsumer =
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
index 2a1b552ab26..6110cfbbbad 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
@@ -18,7 +18,7 @@ package org.apache.solr.crossdc.manager.messageprocessor;
import static org.apache.solr.SolrTestCaseJ4.assumeWorkingMockito;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -76,6 +76,7 @@ public class SolrMessageProcessorTest {
solrMessageProcessor.handleItem(mirroredSolrRequest);
assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+ assertNotNull(result.getItem());
}
/** Should handle MirroredSolrRequest and return a failed result with
resubmit */
@@ -92,6 +93,7 @@ public class SolrMessageProcessorTest {
solrMessageProcessor.handleItem(mirroredSolrRequest);
assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+ assertNotNull(result.getItem());
assertEquals(mirroredSolrRequest, result.getItem());
}
@@ -111,7 +113,7 @@ public class SolrMessageProcessorTest {
solrMessageProcessor.handleItem(mirroredSolrRequest);
assertEquals(IQueueHandler.ResultStatus.HANDLED, result.status());
- assertNull(result.getItem());
+ assertNotNull(result.getItem());
}
/** Should connect to Solr if not connected and process the request */
diff --git
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java
index f4fa5f7ae82..145630deee7 100644
---
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java
+++
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java
@@ -39,16 +39,8 @@ public interface IQueueHandler<T> {
private final Throwable _throwable;
private final T _item;
- public Result(final ResultStatus status) {
- _status = status;
- _throwable = null;
- _item = null;
- }
-
- public Result(final ResultStatus status, final Throwable throwable) {
- _status = status;
- _throwable = throwable;
- _item = null;
+ public Result(final ResultStatus status, final T newItem) {
+ this(status, null, newItem);
}
public Result(final ResultStatus status, final Throwable throwable, final
T newItem) {