Repository: kafka Updated Branches: refs/heads/trunk 3b36d5cff -> 4db048d61
MINOR: don't throw CommitFailedException during suspendTasksAndState Author: Damian Guy <[email protected]> Reviewers: Matthias J. Sax, Guozhang Wang Closes #2535 from dguy/minor-commit-failed Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4db048d6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4db048d6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4db048d6 Branch: refs/heads/trunk Commit: 4db048d61206bc6efbd143d6293216b7cb4b86c5 Parents: 3b36d5c Author: Damian Guy <[email protected]> Authored: Tue Feb 14 10:40:39 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Feb 14 10:43:34 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/processor/internals/StreamThread.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4db048d6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index fba3db5..12d472b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -477,7 +477,9 @@ public class StreamThread extends Thread { firstException.compareAndSet(null, flushAllState()); // only commit after all state has been flushed and there hasn't been an exception if (firstException.get() == null) { - firstException.set(commitOffsets()); + // TODO: currently commit failures will not be thrown to users + // while suspending tasks; this need to be re-visit after KIP-98 + commitOffsets(); } // remove the changelog partitions from restore consumer firstException.compareAndSet(null, unAssignChangeLogPartitions());
