This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.3 by this push:
     new 05c802d  Don't let catch/finally suppress main exception in 
IncrementalPublishingKafkaIndexTaskRunner (#6260)
05c802d is described below

commit 05c802d60fa8b5a5640c1f423f2df37c5577d309
Author: Jonathan Wei <jon-...@users.noreply.github.com>
AuthorDate: Tue Aug 28 17:33:05 2018 -0700

    Don't let catch/finally suppress main exception in 
IncrementalPublishingKafkaIndexTaskRunner (#6260)
---
 .../io/druid/indexing/kafka/KafkaIndexTask.java    | 65 +++++++++++++++++-----
 1 file changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index d0590dc..6ace135 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -419,6 +419,7 @@ public class KafkaIndexTask extends AbstractTask implements 
ChatHandler
         )
     );
 
+    Throwable caughtExceptionOuter = null;
     try (final KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
       toolbox.getDataSegmentServerAnnouncer().announce();
       toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@@ -512,6 +513,7 @@ public class KafkaIndexTask extends AbstractTask implements 
ChatHandler
       // Could eventually support leader/follower mode (for keeping replicas 
more in sync)
       boolean stillReading = !assignment.isEmpty();
       status = Status.READING;
+      Throwable caughtExceptionInner = null;
       try {
         while (stillReading) {
           if (possiblyPause()) {
@@ -717,12 +719,22 @@ public class KafkaIndexTask extends AbstractTask 
implements ChatHandler
       }
       catch (Exception e) {
         // (1) catch all exceptions while reading from kafka
+        caughtExceptionInner = e;
         log.error(e, "Encountered exception in run() before persisting.");
         throw e;
       }
       finally {
         log.info("Persisting all pending data");
-        driver.persist(committerSupplier.get()); // persist pending data
+        try {
+          driver.persist(committerSupplier.get()); // persist pending data
+        }
+        catch (Exception e) {
+          if (caughtExceptionInner != null) {
+            caughtExceptionInner.addSuppressed(e);
+          } else {
+            throw e;
+          }
+        }
       }
 
       synchronized (statusLock) {
@@ -792,9 +804,17 @@ public class KafkaIndexTask extends AbstractTask 
implements ChatHandler
     catch (InterruptedException | RejectedExecutionException e) {
       // (2) catch InterruptedException and RejectedExecutionException thrown 
for the whole ingestion steps including
       // the final publishing.
-      Futures.allAsList(publishWaitList).cancel(true);
-      Futures.allAsList(handOffWaitList).cancel(true);
-      appenderator.closeNow();
+      caughtExceptionOuter = e;
+      try {
+        Futures.allAsList(publishWaitList).cancel(true);
+        Futures.allAsList(handOffWaitList).cancel(true);
+        if (appenderator != null) {
+          appenderator.closeNow();
+        }
+      }
+      catch (Exception e2) {
+        e.addSuppressed(e2);
+      }
       // handle the InterruptedException that gets wrapped in a 
RejectedExecutionException
       if (e instanceof RejectedExecutionException
           && (e.getCause() == null || !(e.getCause() instanceof 
InterruptedException))) {
@@ -811,21 +831,38 @@ public class KafkaIndexTask extends AbstractTask 
implements ChatHandler
     }
     catch (Exception e) {
       // (3) catch all other exceptions thrown for the whole ingestion steps 
including the final publishing.
-      Futures.allAsList(publishWaitList).cancel(true);
-      Futures.allAsList(handOffWaitList).cancel(true);
-      appenderator.closeNow();
+      caughtExceptionOuter = e;
+      try {
+        Futures.allAsList(publishWaitList).cancel(true);
+        Futures.allAsList(handOffWaitList).cancel(true);
+        if (appenderator != null) {
+          appenderator.closeNow();
+        }
+      }
+      catch (Exception e2) {
+        e.addSuppressed(e2);
+      }
       throw e;
     }
     finally {
-      if (driver != null) {
-        driver.close();
+      try {
+        if (driver != null) {
+          driver.close();
+        }
+        if (chatHandlerProvider.isPresent()) {
+          chatHandlerProvider.get().unregister(getId());
+        }
+
+        toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
+        toolbox.getDataSegmentServerAnnouncer().unannounce();
       }
-      if (chatHandlerProvider.isPresent()) {
-        chatHandlerProvider.get().unregister(getId());
+      catch (Exception e) {
+        if (caughtExceptionOuter != null) {
+          caughtExceptionOuter.addSuppressed(e);
+        } else {
+          throw e;
+        }
       }
-
-      toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
-      toolbox.getDataSegmentServerAnnouncer().unannounce();
     }
 
     return success();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to