This is an automated email from the ASF dual-hosted git repository. jonwei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new c9a27e3 Don't let catch/finally suppress main exception in IncrementalPublishingKafkaIndexTaskRunner (#6258) c9a27e3 is described below commit c9a27e3e8e3ad4dc3a67037daa95677b4d3bae2e Author: Jonathan Wei <jon-...@users.noreply.github.com> AuthorDate: Tue Aug 28 16:12:02 2018 -0700 Don't let catch/finally suppress main exception in IncrementalPublishingKafkaIndexTaskRunner (#6258) --- .../IncrementalPublishingKafkaIndexTaskRunner.java | 66 +++++++++++++++++----- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 434e16e..660ee01 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -317,6 +317,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask ) ); + Throwable caughtExceptionOuter = null; try (final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()) { toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); @@ -412,6 +413,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask // Could eventually support leader/follower mode (for keeping replicas more in sync) boolean stillReading = !assignment.isEmpty(); status = Status.READING; + Throwable caughtExceptionInner = null; try { while (stillReading) { if (possiblyPause()) { @@ -616,12 +618,22 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask } catch (Exception e) { // (1) catch all exceptions while reading from kafka + caughtExceptionInner = e; log.error(e, "Encountered exception in run() before persisting."); throw e; } finally { log.info("Persisting all pending data"); - driver.persist(committerSupplier.get()); // persist pending data + try { + driver.persist(committerSupplier.get()); // persist pending data + } + catch (Exception e) { + if (caughtExceptionInner != null) { + caughtExceptionInner.addSuppressed(e); + } else { + throw e; + } + } } synchronized (statusLock) { @@ -687,9 +699,18 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask catch (InterruptedException | RejectedExecutionException e) { // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including // the final publishing. - Futures.allAsList(publishWaitList).cancel(true); - Futures.allAsList(handOffWaitList).cancel(true); - appenderator.closeNow(); + caughtExceptionOuter = e; + try { + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); + if (appenderator != null) { + appenderator.closeNow(); + } + } + catch (Exception e2) { + e.addSuppressed(e2); + } + // handle the InterruptedException that gets wrapped in a RejectedExecutionException if (e instanceof RejectedExecutionException && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) { @@ -706,21 +727,38 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask } catch (Exception e) { // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. - Futures.allAsList(publishWaitList).cancel(true); - Futures.allAsList(handOffWaitList).cancel(true); - appenderator.closeNow(); + caughtExceptionOuter = e; + try { + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); + if (appenderator != null) { + appenderator.closeNow(); + } + } + catch (Exception e2) { + e.addSuppressed(e2); + } throw e; } finally { - if (driver != null) { - driver.close(); + try { + if (driver != null) { + driver.close(); + } + if (chatHandlerProvider.isPresent()) { + chatHandlerProvider.get().unregister(task.getId()); + } + + toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); + toolbox.getDataSegmentServerAnnouncer().unannounce(); } - if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().unregister(task.getId()); + catch (Exception e) { + if (caughtExceptionOuter != null) { + caughtExceptionOuter.addSuppressed(e); + } else { + throw e; + } } - - toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); - toolbox.getDataSegmentServerAnnouncer().unannounce(); } toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org