jon-wei closed pull request #6258: Don't let catch/finally suppress main exception in IncrementalPublishingKafkaIndexTaskRunner URL: https://github.com/apache/incubator-druid/pull/6258
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 434e16eda4b..660ee016edb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -317,6 +317,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ) ); + Throwable caughtExceptionOuter = null; try (final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()) { toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); @@ -412,6 +413,7 @@ public void run() // Could eventually support leader/follower mode (for keeping replicas more in sync) boolean stillReading = !assignment.isEmpty(); status = Status.READING; + Throwable caughtExceptionInner = null; try { while (stillReading) { if (possiblyPause()) { @@ -616,12 +618,22 @@ public void onFailure(Throwable t) } catch (Exception e) { // (1) catch all exceptions while reading from kafka + caughtExceptionInner = e; log.error(e, "Encountered exception in run() before persisting."); throw e; } finally { log.info("Persisting all pending data"); - driver.persist(committerSupplier.get()); // persist pending data + try { + driver.persist(committerSupplier.get()); // persist pending data + } + catch (Exception e) { + if (caughtExceptionInner != null) { + caughtExceptionInner.addSuppressed(e); + } else { + throw e; + } + } } synchronized (statusLock) { @@ -687,9 +699,18 @@ public void onFailure(Throwable t) catch (InterruptedException | RejectedExecutionException e) { // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including // the final publishing. - Futures.allAsList(publishWaitList).cancel(true); - Futures.allAsList(handOffWaitList).cancel(true); - appenderator.closeNow(); + caughtExceptionOuter = e; + try { + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); + if (appenderator != null) { + appenderator.closeNow(); + } + } + catch (Exception e2) { + e.addSuppressed(e2); + } + // handle the InterruptedException that gets wrapped in a RejectedExecutionException if (e instanceof RejectedExecutionException && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) { @@ -706,21 +727,38 @@ public void onFailure(Throwable t) } catch (Exception e) { // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. - Futures.allAsList(publishWaitList).cancel(true); - Futures.allAsList(handOffWaitList).cancel(true); - appenderator.closeNow(); + caughtExceptionOuter = e; + try { + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); + if (appenderator != null) { + appenderator.closeNow(); + } + } + catch (Exception e2) { + e.addSuppressed(e2); + } throw e; } finally { - if (driver != null) { - driver.close(); + try { + if (driver != null) { + driver.close(); + } + if (chatHandlerProvider.isPresent()) { + chatHandlerProvider.get().unregister(task.getId()); + } + + toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); + toolbox.getDataSegmentServerAnnouncer().unannounce(); } - if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().unregister(task.getId()); + catch (Exception e) { + if (caughtExceptionOuter != null) { + caughtExceptionOuter.addSuppressed(e); + } else { + throw e; + } } - - toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); - toolbox.getDataSegmentServerAnnouncer().unannounce(); } toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null)); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org