ableegoldman commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r448681797
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, if (oldTask.isActive()) { final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + cleanUpTaskProducer(oldTask, taskCloseExceptions); } else { oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already Review comment: Just to be clear, we do assert/enforce this, but in the StreamTask and not in the TaskManager. At this point the TaskManager is actually completely agnostic to task state* and all assertions and branching based on state is internal to the Task implementation. *except for in `handleRevocation`, where I just noticed we still filter the commit based on state, which is now unnecessary ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org