guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r946157896
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ########## @@ -43,35 +43,48 @@ class Tasks { private final Logger log; + // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>(); private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>(); // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or // we receive a new assignment and they are revoked from the thread. - - // Tasks may have been assigned but not yet created because: - // 1. They are for a NamedTopology that is yet known by this host. - // 2. They are to be recycled from an existing restoring task yet to be returned from the state updater. - // - // When that occurs we stash these pending tasks until either they are finally clear to be created, - // or they are revoked from a new assignment. private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate = new HashMap<>(); private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate = new HashMap<>(); private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new HashMap<>(); private final Map<TaskId, Set<TopicPartition>> pendingTasksToUpdateInputPartitions = new HashMap<>(); private final Set<Task> pendingTasksToInit = new HashSet<>(); private final Set<TaskId> pendingTasksToClose = new HashSet<>(); + // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap<>(); Tasks(final LogContext logContext) { this.log = logContext.logger(getClass()); } - void purgePendingTasksToCreate(final Set<TaskId> assignedActiveTasks, final Set<TaskId> assignedStandbyTasks) { - pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks); - pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks); + void purgePendingTasksToCreate() { + pendingActiveTasksToCreate.clear(); + pendingStandbyTasksToCreate.clear(); + } + + Map<TaskId, Set<TopicPartition>> drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) { Review Comment: Ack. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ########## @@ -43,35 +43,48 @@ class Tasks { private final Logger log; + // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>(); private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>(); // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or // we receive a new assignment and they are revoked from the thread. - - // Tasks may have been assigned but not yet created because: - // 1. They are for a NamedTopology that is yet known by this host. - // 2. They are to be recycled from an existing restoring task yet to be returned from the state updater. - // - // When that occurs we stash these pending tasks until either they are finally clear to be created, - // or they are revoked from a new assignment. private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate = new HashMap<>(); private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate = new HashMap<>(); private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new HashMap<>(); private final Map<TaskId, Set<TopicPartition>> pendingTasksToUpdateInputPartitions = new HashMap<>(); private final Set<Task> pendingTasksToInit = new HashSet<>(); private final Set<TaskId> pendingTasksToClose = new HashSet<>(); + // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap<>(); Tasks(final LogContext logContext) { this.log = logContext.logger(getClass()); } - void purgePendingTasksToCreate(final Set<TaskId> assignedActiveTasks, final Set<TaskId> assignedStandbyTasks) { - pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks); - pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks); + void purgePendingTasksToCreate() { + pendingActiveTasksToCreate.clear(); + pendingStandbyTasksToCreate.clear(); + } + + Map<TaskId, Set<TopicPartition>> drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) { + final Map<TaskId, Set<TopicPartition>> pendingActiveTasksForTopologies = + filterMap(pendingActiveTasksToCreate, t -> currentTopologies.contains(t.getKey().topologyName())); + + pendingActiveTasksToCreate.keySet().removeAll(pendingActiveTasksForTopologies.keySet()); + + return pendingActiveTasksForTopologies; + } + + Map<TaskId, Set<TopicPartition>> pendingStandbyTasksForTopologies(final Set<String> currentTopologies) { Review Comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org