cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r968441157


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,96 @@ private void classifyTasksWithoutStateUpdater(final 
Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> 
tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void handleTasksWithStateUpdater(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
+                                             final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
+                                             final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
+                                             final Set<Task> 
tasksToCloseClean) {
+        handleRunningAndSuspendedTasks(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void handleRunningAndSuspendedTasks(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
+                                                final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
+                                                final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
+                                                final Set<Task> 
tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be 
managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = 
activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, 
activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
-    private void classifyTasksWithStateUpdater(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
-                                               final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
-                                               final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
-                                               final Set<Task> 
tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, 
tasksToRecycle, tasksToCloseClean);
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> 
inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        if (task.state() == State.SUSPENDED) {
+            task.resume();
+            moveTaskFromTasksRegistryToStateUpdater(task);
+        }
+    }
+
+    private void moveTaskFromTasksRegistryToStateUpdater(final Task task) {
+        tasks.removeTask(task);
+        stateUpdater.add(task);
+    }
+
+    private void handleTasksInStateUpdater(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
+                                           final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate) {
         for (final Task task : stateUpdater.getTasks()) {
             final TaskId taskId = task.id();
-            final Set<TopicPartition> topicPartitions = 
activeTasksToCreate.get(taskId);
             if (activeTasksToCreate.containsKey(taskId)) {
+                final Set<TopicPartition> inputPartitions = 
activeTasksToCreate.get(taskId);
                 if (task.isActive()) {
-                    if (!task.inputPartitions().equals(topicPartitions)) {
-                        stateUpdater.remove(taskId);
-                        tasks.addPendingTaskToUpdateInputPartitions(taskId, 
topicPartitions);
-                    }
+                    updateInputPartitionsOrRemoveTaskFromTasksToSuspend(task, 
inputPartitions);
                 } else {
-                    stateUpdater.remove(taskId);
-                    tasks.addPendingTaskToRecycle(taskId, topicPartitions);
+                    removeTaskToRecycleFromStateUpdater(taskId, 
inputPartitions);
                 }
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    if (!task.inputPartitions().equals(topicPartitions)) {
-                        stateUpdater.remove(taskId);
-                        tasks.addPendingTaskToUpdateInputPartitions(taskId, 
topicPartitions);
-                    }
-                } else {
-                    stateUpdater.remove(taskId);
-                    tasks.addPendingTaskToRecycle(taskId, topicPartitions);
+                if (task.isActive()) {
+                    removeTaskToRecycleFromStateUpdater(taskId, 
standbyTasksToCreate.get(taskId));
                 }
                 standbyTasksToCreate.remove(taskId);
             } else {
-                stateUpdater.remove(taskId);
-                tasks.addPendingTaskToCloseClean(taskId);
+                removeUnusedTaskFromStateUpdater(taskId);
             }
         }
     }
 
+    private void updateInputPartitionsOrRemoveTaskFromTasksToSuspend(final 
Task task,
+                                                                     final 
Set<TopicPartition> inputPartitions) {
+        final TaskId taskId = task.id();
+        if (!task.inputPartitions().equals(inputPartitions)) {
+            stateUpdater.remove(taskId);
+            tasks.addPendingTaskToUpdateInputPartitions(taskId, 
inputPartitions);
+        } else {
+            tasks.removePendingActiveTaskToSuspend(taskId);

Review Comment:
   We only remove a task from the task to suspend if the task is re-assigned as 
active. For all other cases the task is added to a different set of a pending 
update action (e.g. update input partitions) which removes it from the set of 
tasks to suspend automatically since a task can only be a member of at most one 
set of pending update actions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to