ableegoldman commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440555918
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -714,13 +696,20 @@ void shutdown(final boolean clean) { } } - if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + try { + if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { + commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + } + for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) { + final Task task = tasks.get(taskId); + task.postCommit(); + } + } catch (final RuntimeException e) { + firstException.compareAndSet(null, e); Review comment: I see. Then I think it makes sense to always attempt to write the checkpoint/call `postCommit` for a task that was successfully committed, regardless of whether something went wrong during `postCommit` with a different task And I agree, we should not make assumptions about the current code not throwing, unless it's explicitly in the contract of the method that it will never throw (which is not the case for `postCommit` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org