mjsax commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440554249
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -714,13 +696,20 @@ void shutdown(final boolean clean) { } } - if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + try { + if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { + commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + } + for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) { + final Task task = tasks.get(taskId); + task.postCommit(); + } + } catch (final RuntimeException e) { + firstException.compareAndSet(null, e); Review comment: I meant the later. And I agree that if `commit` fails, we should not call `postCommit()`. For failure in `postCommit`: we make assumptions about the current code what seems dangerous (ie, not future prove)? -- IMHO, if `postCommit` fails, we need to close the corresponding task dirty and either recreate it, or rebalance, but we should also continue to call `postCommit()` for all other tasks? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org