ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439555399



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -239,54 +240,15 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-                    }
-                }
-
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
-                for (final Task task : additionalTasksForCommitting) {
-                    task.postCommit();
-                }
-            } catch (final RuntimeException e) {
-                log.error("Failed to batch commit tasks, " +
-                    "will close all tasks involved in this commit as dirty by 
the end", e);
-                dirtyTasks.addAll(additionalTasksForCommitting);
-                dirtyTasks.addAll(tasksToClose);
-
-                tasksToClose.clear();
-                // Just add first taskId to re-throw by the end.
-                
taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(),
 e);
-            }
-        }
-
-        for (final Task task : tasksToClose) {
-            try {
-                completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format("Failed to close 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
task.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(task.id(), e);
-                // We've already recorded the exception (which is the point of 
clean).
-                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
-                dirtyTasks.add(task);
-            }
-        }
-
         for (final Task oldTask : tasksToRecycle) {
             final Task newTask;
             try {
                 if (oldTask.isActive()) {
+                    // If active, the task should have already been suspended 
and committed during handleRevocation

Review comment:
       Actually I forgot to update this. We should always call `suspend` since 
some active tasks may not have been suspended during `handleRevocation`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to