Repository: kafka Updated Branches: refs/heads/trunk a5a9a901e -> c22c1775a
KAFKA-5541: Follow-up; Try to clean uncleanly upon clean close failure before throwing the exception Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #4046 from mjsax/kafka-5541-minor-follow-up Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c22c1775 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c22c1775 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c22c1775 Branch: refs/heads/trunk Commit: c22c1775a550dbefe6bd4cdcf8820404351257a8 Parents: a5a9a90 Author: Matthias J. Sax <matth...@confluent.io> Authored: Tue Oct 10 15:16:53 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Oct 10 15:16:53 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/AssignedTasks.java | 34 +++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c22c1775/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 680bbd3..7426d6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -507,12 +507,17 @@ class AssignedTasks implements RestoringTasks { } catch (final TaskMigratedException e) { firstException.compareAndSet(null, closeZombieTask(task)); } catch (final RuntimeException t) { - firstException.compareAndSet(null, t); log.error("Failed while closing {} {} due to the following error:", task.getClass().getSimpleName(), task.id(), t); - firstException.compareAndSet(null, closeUncleanIfRequired(task, clean)); + if (clean) { + if (!closeUnclean(task)) { + firstException.compareAndSet(null, t); + } + } else { + firstException.compareAndSet(null, t); + } } } @@ -524,21 +529,18 @@ class AssignedTasks implements RestoringTasks { } } - private RuntimeException closeUncleanIfRequired(final Task task, - final boolean triedToCloseCleanlyPreviously) { - if (triedToCloseCleanlyPreviously) { - log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id()); - try { - task.close(false, false); - } catch (final RuntimeException fatalException) { - log.error("Failed while closing {} {} due to the following error:", - task.getClass().getSimpleName(), - task.id(), - fatalException); - return fatalException; - } + private boolean closeUnclean(final Task task) { + log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id()); + try { + task.close(false, false); + } catch (final RuntimeException fatalException) { + log.error("Failed while closing {} {} due to the following error:", + task.getClass().getSimpleName(), + task.id(), + fatalException); + return false; } - return null; + return true; } }