guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r967380695


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -250,32 +250,502 @@ public void 
shouldClassifyExistingTasksWithoutStateUpdater() {
     }
 
     @Test
-    public void shouldClassifyExistingTasksWithStateUpdater() {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
-        final StandbyTask standbyTaskToRecycle = standbyTask(taskId02, 
mkSet(t2p2)).build();
-        final StandbyTask standbyTaskToClose = standbyTask(taskId04, 
mkSet(t2p0)).build();
-        final StreamTask restoringActiveTaskToRecycle = statefulTask(taskId03, 
mkSet(t1p3)).build();
-        final StreamTask restoringActiveTaskToClose = statefulTask(taskId01, 
mkSet(t1p1)).build();
-        final Map<TaskId, Set<TopicPartition>> standbyTasks =
-            mkMap(mkEntry(standbyTaskToRecycle.id(), 
standbyTaskToRecycle.changelogPartitions()));
-        final Map<TaskId, Set<TopicPartition>> restoringActiveTasks = mkMap(
-            mkEntry(restoringActiveTaskToRecycle.id(), 
restoringActiveTaskToRecycle.changelogPartitions())
+    public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {

Review Comment:
   I made a brief pass on the testings, did not spot any obvious issues. Great 
job on improving the coverage!



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final 
Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        
tasks.addPendingActiveTaskToSuspend(restoringTask.id());

Review Comment:
   I think we should remove the task from other pending sets if it was in them. 
Since in handling removed / restored tasks from state updater checking the 
other pending sets 
(https://github.com/apache/kafka/pull/12600/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR804)
 has a higher order, right?
   
   I was thinking about the following steps:
   
   T1: a first rebalance completes where a task was recycled to active, it was 
added to pending-recycle, and call to `stateUpdater.remove`.
   T2: a new rebalance starts, where we `handleRevocation`, and then this task 
is revoked, we add it to the pending-suspend sets as well
   T3: finally the task is returned from state updater, we would check the 
pending-recycle first and trigger that logic, which would be wrong.
   
   More generally, I think as a principle, a task should only be in one of the 
pending sets in Tasks. So whenever we are adding a a task to a pending set, we 
should make sure they are not in other pending sets.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -817,14 +867,17 @@ private boolean handleRestoredTasksFromStateUpdater(final 
long now,
         for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) 
{
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = 
tasks.removePendingTaskToRecycle(task.id())) != null) {
-                recycleTask(task, inputPartitions, tasksToCloseDirty, 
taskExceptions);
+                recycleTaskFromStateUpdater(task, inputPartitions, 
tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
                 tasksToCloseDirty.add(task);
             } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
                 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
                 transitRestoredTaskToRunning(task, now, offsetResetter);
+            } else if (tasks.removePendingActiveTaskToSuspend(task.id())) {
+                task.suspend();
+                tasks.addTask(task);

Review Comment:
   Ah right, that makes sense. We'd still need the pending suspend tasks then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to