jon-wei closed pull request #6258: Don't let catch/finally suppress main 
exception in IncrementalPublishingKafkaIndexTaskRunner
URL: https://github.com/apache/incubator-druid/pull/6258
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 434e16eda4b..660ee016edb 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -317,6 +317,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws 
Exception
         )
     );
 
+    Throwable caughtExceptionOuter = null;
     try (final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()) {
       toolbox.getDataSegmentServerAnnouncer().announce();
       toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@@ -412,6 +413,7 @@ public void run()
       // Could eventually support leader/follower mode (for keeping replicas 
more in sync)
       boolean stillReading = !assignment.isEmpty();
       status = Status.READING;
+      Throwable caughtExceptionInner = null;
       try {
         while (stillReading) {
           if (possiblyPause()) {
@@ -616,12 +618,22 @@ public void onFailure(Throwable t)
       }
       catch (Exception e) {
         // (1) catch all exceptions while reading from kafka
+        caughtExceptionInner = e;
         log.error(e, "Encountered exception in run() before persisting.");
         throw e;
       }
       finally {
         log.info("Persisting all pending data");
-        driver.persist(committerSupplier.get()); // persist pending data
+        try {
+          driver.persist(committerSupplier.get()); // persist pending data
+        }
+        catch (Exception e) {
+          if (caughtExceptionInner != null) {
+            caughtExceptionInner.addSuppressed(e);
+          } else {
+            throw e;
+          }
+        }
       }
 
       synchronized (statusLock) {
@@ -687,9 +699,18 @@ public void onFailure(Throwable t)
     catch (InterruptedException | RejectedExecutionException e) {
       // (2) catch InterruptedException and RejectedExecutionException thrown 
for the whole ingestion steps including
       // the final publishing.
-      Futures.allAsList(publishWaitList).cancel(true);
-      Futures.allAsList(handOffWaitList).cancel(true);
-      appenderator.closeNow();
+      caughtExceptionOuter = e;
+      try {
+        Futures.allAsList(publishWaitList).cancel(true);
+        Futures.allAsList(handOffWaitList).cancel(true);
+        if (appenderator != null) {
+          appenderator.closeNow();
+        }
+      }
+      catch (Exception e2) {
+        e.addSuppressed(e2);
+      }
+
       // handle the InterruptedException that gets wrapped in a 
RejectedExecutionException
       if (e instanceof RejectedExecutionException
           && (e.getCause() == null || !(e.getCause() instanceof 
InterruptedException))) {
@@ -706,21 +727,38 @@ public void onFailure(Throwable t)
     }
     catch (Exception e) {
       // (3) catch all other exceptions thrown for the whole ingestion steps 
including the final publishing.
-      Futures.allAsList(publishWaitList).cancel(true);
-      Futures.allAsList(handOffWaitList).cancel(true);
-      appenderator.closeNow();
+      caughtExceptionOuter = e;
+      try {
+        Futures.allAsList(publishWaitList).cancel(true);
+        Futures.allAsList(handOffWaitList).cancel(true);
+        if (appenderator != null) {
+          appenderator.closeNow();
+        }
+      }
+      catch (Exception e2) {
+        e.addSuppressed(e2);
+      }
       throw e;
     }
     finally {
-      if (driver != null) {
-        driver.close();
+      try {
+        if (driver != null) {
+          driver.close();
+        }
+        if (chatHandlerProvider.isPresent()) {
+          chatHandlerProvider.get().unregister(task.getId());
+        }
+
+        toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
+        toolbox.getDataSegmentServerAnnouncer().unannounce();
       }
-      if (chatHandlerProvider.isPresent()) {
-        chatHandlerProvider.get().unregister(task.getId());
+      catch (Exception e) {
+        if (caughtExceptionOuter != null) {
+          caughtExceptionOuter.addSuppressed(e);
+        } else {
+          throw e;
+        }
       }
-
-      toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
-      toolbox.getDataSegmentServerAnnouncer().unannounce();
     }
 
     toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to