This is an automated email from the ASF dual-hosted git repository. fjy pushed a commit to branch 0.12.3 in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.3 by this push: new 05c802d Don't let catch/finally suppress main exception in IncrementalPublishingKafkaIndexTaskRunner (#6260) 05c802d is described below commit 05c802d60fa8b5a5640c1f423f2df37c5577d309 Author: Jonathan Wei <jon-...@users.noreply.github.com> AuthorDate: Tue Aug 28 17:33:05 2018 -0700 Don't let catch/finally suppress main exception in IncrementalPublishingKafkaIndexTaskRunner (#6260) --- .../io/druid/indexing/kafka/KafkaIndexTask.java | 65 +++++++++++++++++----- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d0590dc..6ace135 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -419,6 +419,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler ) ); + Throwable caughtExceptionOuter = null; try (final KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); @@ -512,6 +513,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler // Could eventually support leader/follower mode (for keeping replicas more in sync) boolean stillReading = !assignment.isEmpty(); status = Status.READING; + Throwable caughtExceptionInner = null; try { while (stillReading) { if (possiblyPause()) { @@ -717,12 +719,22 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler } catch (Exception e) { // (1) catch all exceptions while reading from kafka + caughtExceptionInner = e; log.error(e, "Encountered exception in run() before persisting."); throw e; } finally { log.info("Persisting all pending data"); - driver.persist(committerSupplier.get()); // persist pending data + try { + driver.persist(committerSupplier.get()); // persist pending data + } + catch (Exception e) { + if (caughtExceptionInner != null) { + caughtExceptionInner.addSuppressed(e); + } else { + throw e; + } + } } synchronized (statusLock) { @@ -792,9 +804,17 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler catch (InterruptedException | RejectedExecutionException e) { // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including // the final publishing. - Futures.allAsList(publishWaitList).cancel(true); - Futures.allAsList(handOffWaitList).cancel(true); - appenderator.closeNow(); + caughtExceptionOuter = e; + try { + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); + if (appenderator != null) { + appenderator.closeNow(); + } + } + catch (Exception e2) { + e.addSuppressed(e2); + } // handle the InterruptedException that gets wrapped in a RejectedExecutionException if (e instanceof RejectedExecutionException && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) { @@ -811,21 +831,38 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler } catch (Exception e) { // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. - Futures.allAsList(publishWaitList).cancel(true); - Futures.allAsList(handOffWaitList).cancel(true); - appenderator.closeNow(); + caughtExceptionOuter = e; + try { + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); + if (appenderator != null) { + appenderator.closeNow(); + } + } + catch (Exception e2) { + e.addSuppressed(e2); + } throw e; } finally { - if (driver != null) { - driver.close(); + try { + if (driver != null) { + driver.close(); + } + if (chatHandlerProvider.isPresent()) { + chatHandlerProvider.get().unregister(getId()); + } + + toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); + toolbox.getDataSegmentServerAnnouncer().unannounce(); } - if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().unregister(getId()); + catch (Exception e) { + if (caughtExceptionOuter != null) { + caughtExceptionOuter.addSuppressed(e); + } else { + throw e; + } } - - toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); - toolbox.getDataSegmentServerAnnouncer().unannounce(); } return success(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org