cadonna commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r968441157
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -421,73 +421,96 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti } } - private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, - final Map<Task, Set<TopicPartition>> tasksToRecycle, - final Set<Task> tasksToCloseClean) { + private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, + final Map<Task, Set<TopicPartition>> tasksToRecycle, + final Set<Task> tasksToCloseClean) { + handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); + handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate); + } + + private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, + final Map<Task, Set<TopicPartition>> tasksToRecycle, + final Set<Task> tasksToCloseClean) { for (final Task task : tasks.allTasks()) { + if (!task.isActive()) { + throw new IllegalStateException("Standby tasks should only be managed by the state updater"); + } final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { - if (task.isActive()) { - final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId); - if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) { - task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id())); - } - task.resume(); - } else { - throw new IllegalStateException("Standby tasks should only be managed by the state updater"); - } + handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId)); activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { - if (!task.isActive()) { - throw new IllegalStateException("Standby tasks should only be managed by the state updater"); - } else { - tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); - } + tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); standbyTasksToCreate.remove(taskId); } else { tasksToCloseClean.add(task); } } } - private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, - final Map<Task, Set<TopicPartition>> tasksToRecycle, - final Set<Task> tasksToCloseClean) { - classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); + private void handleReAssignedActiveTask(final Task task, + final Set<TopicPartition> inputPartitions) { + if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) { + task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); + } + if (task.state() == State.SUSPENDED) { + task.resume(); + moveTaskFromTasksRegistryToStateUpdater(task); + } + } + + private void moveTaskFromTasksRegistryToStateUpdater(final Task task) { + tasks.removeTask(task); + stateUpdater.add(task); + } + + private void handleTasksInStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) { for (final Task task : stateUpdater.getTasks()) { final TaskId taskId = task.id(); - final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId); if (activeTasksToCreate.containsKey(taskId)) { + final Set<TopicPartition> inputPartitions = activeTasksToCreate.get(taskId); if (task.isActive()) { - if (!task.inputPartitions().equals(topicPartitions)) { - stateUpdater.remove(taskId); - tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions); - } + updateInputPartitionsOrRemoveTaskFromTasksToSuspend(task, inputPartitions); } else { - stateUpdater.remove(taskId); - tasks.addPendingTaskToRecycle(taskId, topicPartitions); + removeTaskToRecycleFromStateUpdater(taskId, inputPartitions); } activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { - if (!task.isActive()) { - if (!task.inputPartitions().equals(topicPartitions)) { - stateUpdater.remove(taskId); - tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions); - } - } else { - stateUpdater.remove(taskId); - tasks.addPendingTaskToRecycle(taskId, topicPartitions); + if (task.isActive()) { + removeTaskToRecycleFromStateUpdater(taskId, standbyTasksToCreate.get(taskId)); } standbyTasksToCreate.remove(taskId); } else { - stateUpdater.remove(taskId); - tasks.addPendingTaskToCloseClean(taskId); + removeUnusedTaskFromStateUpdater(taskId); } } } + private void updateInputPartitionsOrRemoveTaskFromTasksToSuspend(final Task task, + final Set<TopicPartition> inputPartitions) { + final TaskId taskId = task.id(); + if (!task.inputPartitions().equals(inputPartitions)) { + stateUpdater.remove(taskId); + tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions); + } else { + tasks.removePendingActiveTaskToSuspend(taskId); Review Comment: We only remove a task from the task to suspend if the task is re-assigned as active. For all other cases the task is added to a different set of a pending update action (e.g. update input partitions) which removes it from the set of tasks to suspend automatically since a task can only be a member of at most one set of pending update actions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org