guozhangwang commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r449799110



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       I think @vvcephei 's confusion comes from a change that we now require 
all tasks to be transited to `suspended` before transiting to `close`: 
previously, we allow e.g. a running task to be closed immediately and inside 
the task itself the logic actually did the "running -> suspend -> close" logic, 
i.e. it is totally agnostic to the TM. In a refactoring with eos-beta we 
changed it. So now the responsibility is kinda split between the two: TM needs 
to make sure the transition is valid and the task verifies it. By doing this we 
avoided the "pre-/post-" functions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to