guozhangwang commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r450471701



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       Hey folks, taking a step back here regarding this, here are a few 
different principles we can exercise regarding the class hierarchy:
   
   1) TM to be totally agnostic to the task's state, and the task's method 
would transit multiple state if necessary: e.g. if TM calls `close` on a 
running task, then task would first transit to suspended, and then to closed; 
if TM calls `suspend` on a suspended task, then it would be a no-op. In this 
case, then here TM could just blindly call `close` or `suspend` for closing / 
recycling tasks, and the Task's methods would handle state transition.
   
   This would be ideal, but the current issue is that we need to commit post 
suspending a task today (i.e. handleRevocation would commit those suspended 
active tasks, while handleAssignment would commit those suspended standby 
tasks), and hence if we blindly suspend and commit a task, we may unnecessarily 
double committing a suspended task.
   
   Right now to work around this issue, we choose to:
   
   2) Letting TM to be aware of the task's state transition rules and tries to 
obey it. Similarly, we can also let TM to check the task's state, and then call 
suspend and commit conditionally -- at the moment, only standby tasks would not 
be suspended, so we would effectively end up the same logic as to only suspend 
standby tasks. So either checking `task.isActive` or `task.isSuspended` would 
work the same.
   
   Personally I think if we want the TM to be purely agnostic to task state 
transition, an alternative approach could be, that we make preCommit / 
postCommit to be "idempotent" as well, e.g. if the task remembers since last 
`preCommit` we do not have processed any records, then a second call to 
`preCommit` would be a no-op and returns empty map, and similarly `postCommit` 
can also be a no-op if there's no processed records since its last 
checkpointing. And then in TM, we can blindly call:
   
   ```
   task.suspend();
   aggregatOffsets(task.prepreCommit());
   if (!offsetsMap.isEmpty()) call sendOffsets();
   postOffsets(task.postCommit());
   
   if (isActive()) {
      recycle as standby;
   } else {
      recycle as active;
   }
   ```
   
   And similarly for tasksToClose we can blindly call `suspend / preCommit / 
postCommit` first. WDYT?
   
   Anyways, I think this discussion can continue in a follow-up PR (maybe I can 
incorporate whatever we've agreed on in the decoupling flushing/committing PR) 
and we can merge this one as-is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to