guozhangwang commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r449799110
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, if (oldTask.isActive()) { final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + cleanUpTaskProducer(oldTask, taskCloseExceptions); } else { oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already Review comment: I think @vvcephei 's confusion comes from a change that we now require all tasks to be transited to `suspended` before transiting to `close`: previously, we allow e.g. a running task to be closed immediately and inside the task itself the logic actually did the "running -> suspend -> close" logic, i.e. it is totally agnostic to the TM. In a refactoring with eos-beta we changed it. So now the responsibility is kinda split between the two: TM needs to make sure the transition is valid and the task verifies it. By doing this we avoided the "pre-/post-" functions. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org