ableegoldman commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r448681797



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       Just to be clear, we do assert/enforce this, but in the StreamTask and 
not in the TaskManager. At this point the TaskManager is actually completely 
agnostic to task state* and all assertions and branching based on state is 
internal to the Task implementation.
   
   *except for in `handleRevocation`, where I just noticed we still filter the 
commit based on state, which is now unnecessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to