guozhangwang commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r450471701
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, if (oldTask.isActive()) { final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + cleanUpTaskProducer(oldTask, taskCloseExceptions); } else { oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already Review comment: Hey folks, taking a step back here regarding this, here are a few different principles we can exercise regarding the class hierarchy: 1) TM to be totally agnostic to the task's state, and the task's method would transit multiple state if necessary: e.g. if TM calls `close` on a running task, then task would first transit to suspended, and then to closed; if TM calls `suspend` on a suspended task, then it would be a no-op. In this case, then here TM could just blindly call `close` or `suspend` for closing / recycling tasks, and the Task's methods would handle state transition. This would be ideal, but the current issue is that we need to commit post suspending a task today (i.e. handleRevocation would commit those suspended active tasks, while handleAssignment would commit those suspended standby tasks), and hence if we blindly suspend and commit a task, we may unnecessarily double committing a suspended task. Right now to work around this issue, we choose to: 2) Letting TM to be aware of the task's state transition rules and tries to obey it. Similarly, we can also let TM to check the task's state, and then call suspend and commit conditionally -- at the moment, only standby tasks would not be suspended, so we would effectively end up the same logic as to only suspend standby tasks. So either checking `task.isActive` or `task.isSuspended` would work the same. Personally I think if we want the TM to be purely agnostic to task state transition, an alternative approach could be, that we make preCommit / postCommit to be "idempotent" as well, e.g. if the task remembers since last `preCommit` we do not have processed any records, then a second call to `preCommit` would be a no-op and returns empty map, and similarly `postCommit` can also be a no-op if there's no processed records since its last checkpointing. And then in TM, we can blindly call: ``` task.suspend(); aggregatOffsets(task.prepreCommit()); if (!offsetsMap.isEmpty()) call sendOffsets(); postOffsets(task.postCommit()); if (isActive()) { recycle as standby; } else { recycle as active; } ``` And similarly for tasksToClose we can blindly call `suspend / preCommit / postCommit` first. WDYT? Anyways, I think this discussion can continue in a follow-up PR (maybe I can incorporate whatever we've agreed on in the decoupling flushing/committing PR) and we can merge this one as-is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org