mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440554249



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : 
consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       I meant the later. And I agree that if `commit` fails, we should not 
call `postCommit()`.
   
   For failure in `postCommit`: we make assumptions about the current code what 
seems dangerous (ie, not future prove)? -- IMHO, if `postCommit` fails, we need 
to close the corresponding task dirty and either recreate it, or rebalance, but 
we should also continue to call `postCommit()` for all other tasks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to