This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new be58522c858 fix: Improved error handling in 
SeekableStreamIndexTaskRunner. (#19218)
be58522c858 is described below

commit be58522c858e51649dd026e886c2e9c03463ed13
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Mar 27 13:22:01 2026 -0700

    fix: Improved error handling in SeekableStreamIndexTaskRunner. (#19218)
    
    The main improvement is that "persist" is moved out of a finally block,
    and now only happens on the normal path. This has two benefits. First,
    there is no point in persisting on the error path, and the in-memory
    index might be in a bad state anyway at that point. Second, moving the
    persist call out of "finally" fixes an issue where an exception thrown
    from "persist" would cause an exception thrown from "add" to be lost.
    
    This can come up in production when the in-memory index grows too large,
    causing the main code to throw an OutOfMemoryError, and then something
    goes wrong with the persist too. In this situation the original
    OutOfMemoryError would not have been logged.
    
    A secondary improvement is that we catch Throwable rather than Exception
    to trigger cleanup and when handling errors that occur during cleanup.
    This ensures we don't miss cleanup tasks when an Error is thrown by
    the main code, and that we don't lose the original exception if an Error
    is thrown by the cleanup code.
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  4 +--
 .../SeekableStreamIndexTaskRunner.java             | 38 ++++++++--------------
 2 files changed, 15 insertions(+), 27 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 4c0ca3284d3..1374cc4abf7 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1752,8 +1752,8 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
     Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
-    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
-    Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists());
+    Assert.assertEquals(0, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(0, observedSegmentGenerationMetrics.numPersists());
     Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount());
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 0f986dfc9c4..c4fa66e1b9e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -445,7 +445,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         )
     );
 
-    Throwable caughtExceptionOuter = null;
+    Throwable caughtException = null;
 
     //milliseconds waited for created segments to be handed off
     long handoffWaitMs = 0L;
@@ -607,8 +607,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       // Could eventually support leader/follower mode (for keeping replicas 
more in sync)
       boolean stillReading = !assignment.isEmpty();
       status = Status.READING;
-      Throwable caughtExceptionInner = null;
-
       try {
         while (stillReading) {
           if (possiblyPause()) {
@@ -809,9 +807,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           }
         }
       }
-      catch (Exception e) {
+      catch (Throwable e) {
         // (1) catch all exceptions while reading from kafka
-        caughtExceptionInner = e;
         if (Throwables.getRootCause(e) instanceof InterruptedException) {
           // Suppress InterruptedException stack trace to avoid flooding the 
logs
           log.error("Encounted InterrupedException in run() before 
persisting");
@@ -821,20 +818,11 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         throw e;
       }
       finally {
-        try {
-          // To handle cases where tasks stop reading due to stop request or 
exceptions
-          segmentGenerationMetrics.markProcessingDone();
-          driver.persist(committerSupplier.get()); // persist pending data
-        }
-        catch (Exception e) {
-          if (caughtExceptionInner != null) {
-            caughtExceptionInner.addSuppressed(e);
-          } else {
-            throw e;
-          }
-        }
+        segmentGenerationMetrics.markProcessingDone();
       }
 
+      driver.persist(committerSupplier.get()); // persist pending data
+
       synchronized (statusLock) {
         if (stopRequested.get() && !publishOnStop.get()) {
           throw new InterruptedException("Stopping without publishing");
@@ -861,7 +849,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           // Committer is built.)
           sequenceMetadata.updateAssignments(currOffsets, 
this::isMoreToReadAfterReadingRecord);
           publishingSequences.add(sequenceMetadata.getSequenceName());
-          // persist already done in finally, so directly add to publishQueue
+          // persist already done above, so directly add to publishQueue
           publishAndRegisterHandoff(sequenceMetadata);
         }
       }
@@ -913,7 +901,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     catch (InterruptedException | RejectedExecutionException e) {
       // (2) catch InterruptedException and RejectedExecutionException thrown 
for the whole ingestion steps including
       // the final publishing.
-      caughtExceptionOuter = e;
+      caughtException = e;
       try {
         Futures.allAsList(publishWaitList).cancel(true);
         Futures.allAsList(handOffWaitList).cancel(true);
@@ -921,7 +909,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           appenderator.closeNow();
         }
       }
-      catch (Exception e2) {
+      catch (Throwable e2) {
         e.addSuppressed(e2);
       }
 
@@ -937,9 +925,9 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         throw e;
       }
     }
-    catch (Exception e) {
+    catch (Throwable e) {
       // (3) catch all other exceptions thrown for the whole ingestion steps 
including the final publishing.
-      caughtExceptionOuter = e;
+      caughtException = e;
       try {
         Futures.allAsList(publishWaitList).cancel(true);
         Futures.allAsList(handOffWaitList).cancel(true);
@@ -947,7 +935,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           appenderator.closeNow();
         }
       }
-      catch (Exception e2) {
+      catch (Throwable e2) {
         e.addSuppressed(e2);
       }
       throw e;
@@ -966,8 +954,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         rejectionPeriodUpdaterExec.shutdown();
       }
       catch (Throwable e) {
-        if (caughtExceptionOuter != null) {
-          caughtExceptionOuter.addSuppressed(e);
+        if (caughtException != null) {
+          caughtException.addSuppressed(e);
         } else {
           throw e;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to