This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new be58522c858 fix: Improved error handling in
SeekableStreamIndexTaskRunner. (#19218)
be58522c858 is described below
commit be58522c858e51649dd026e886c2e9c03463ed13
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Mar 27 13:22:01 2026 -0700
fix: Improved error handling in SeekableStreamIndexTaskRunner. (#19218)
The main improvement is that "persist" is moved out of a finally block,
and now only happens on the normal path. This has two benefits. First,
there is no point in persisting on the error path, and the in-memory
index might be in a bad state anyway at that point. Second, moving the
persist call out of "finally" fixes an issue where an exception thrown
from "persist" would cause an exception thrown from "add" to be lost.
This can come up in production when the in-memory index grows too large,
causing the main code to throw an OutOfMemoryError, and then something
goes wrong with the persist too. In this situation the original
OutOfMemoryError would not have been logged.
A secondary improvement is that we catch Throwable rather than Exception
to trigger cleanup and when handling errors that occur during cleanup.
This ensures we don't miss cleanup tasks when an Error is thrown by
the main code, and that we don't lose the original exception if an Error
is thrown by the cleanup code.
---
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 4 +--
.../SeekableStreamIndexTaskRunner.java | 38 ++++++++--------------
2 files changed, 15 insertions(+), 27 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 4c0ca3284d3..1374cc4abf7 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1752,8 +1752,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
- Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
- Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists());
+ Assert.assertEquals(0, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(0, observedSegmentGenerationMetrics.numPersists());
Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 0f986dfc9c4..c4fa66e1b9e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -445,7 +445,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
)
);
- Throwable caughtExceptionOuter = null;
+ Throwable caughtException = null;
//milliseconds waited for created segments to be handed off
long handoffWaitMs = 0L;
@@ -607,8 +607,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// Could eventually support leader/follower mode (for keeping replicas
more in sync)
boolean stillReading = !assignment.isEmpty();
status = Status.READING;
- Throwable caughtExceptionInner = null;
-
try {
while (stillReading) {
if (possiblyPause()) {
@@ -809,9 +807,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
}
- catch (Exception e) {
+ catch (Throwable e) {
// (1) catch all exceptions while reading from kafka
- caughtExceptionInner = e;
if (Throwables.getRootCause(e) instanceof InterruptedException) {
// Suppress InterruptedException stack trace to avoid flooding the
logs
log.error("Encounted InterrupedException in run() before
persisting");
@@ -821,20 +818,11 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
throw e;
}
finally {
- try {
- // To handle cases where tasks stop reading due to stop request or
exceptions
- segmentGenerationMetrics.markProcessingDone();
- driver.persist(committerSupplier.get()); // persist pending data
- }
- catch (Exception e) {
- if (caughtExceptionInner != null) {
- caughtExceptionInner.addSuppressed(e);
- } else {
- throw e;
- }
- }
+ segmentGenerationMetrics.markProcessingDone();
}
+ driver.persist(committerSupplier.get()); // persist pending data
+
synchronized (statusLock) {
if (stopRequested.get() && !publishOnStop.get()) {
throw new InterruptedException("Stopping without publishing");
@@ -861,7 +849,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// Committer is built.)
sequenceMetadata.updateAssignments(currOffsets,
this::isMoreToReadAfterReadingRecord);
publishingSequences.add(sequenceMetadata.getSequenceName());
- // persist already done in finally, so directly add to publishQueue
+ // persist already done above, so directly add to publishQueue
publishAndRegisterHandoff(sequenceMetadata);
}
}
@@ -913,7 +901,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
catch (InterruptedException | RejectedExecutionException e) {
// (2) catch InterruptedException and RejectedExecutionException thrown
for the whole ingestion steps including
// the final publishing.
- caughtExceptionOuter = e;
+ caughtException = e;
try {
Futures.allAsList(publishWaitList).cancel(true);
Futures.allAsList(handOffWaitList).cancel(true);
@@ -921,7 +909,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
appenderator.closeNow();
}
}
- catch (Exception e2) {
+ catch (Throwable e2) {
e.addSuppressed(e2);
}
@@ -937,9 +925,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
throw e;
}
}
- catch (Exception e) {
+ catch (Throwable e) {
// (3) catch all other exceptions thrown for the whole ingestion steps
including the final publishing.
- caughtExceptionOuter = e;
+ caughtException = e;
try {
Futures.allAsList(publishWaitList).cancel(true);
Futures.allAsList(handOffWaitList).cancel(true);
@@ -947,7 +935,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
appenderator.closeNow();
}
}
- catch (Exception e2) {
+ catch (Throwable e2) {
e.addSuppressed(e2);
}
throw e;
@@ -966,8 +954,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
rejectionPeriodUpdaterExec.shutdown();
}
catch (Throwable e) {
- if (caughtExceptionOuter != null) {
- caughtExceptionOuter.addSuppressed(e);
+ if (caughtException != null) {
+ caughtException.addSuppressed(e);
} else {
throw e;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]