guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r946157896


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -43,35 +43,48 @@
 class Tasks {
     private final Logger log;
 
+    // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
     private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
 
     // Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
     // these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
     // we receive a new assignment and they are revoked from the thread.
-
-    // Tasks may have been assigned but not yet created because:
-    // 1. They are for a NamedTopology that is yet known by this host.
-    // 2. They are to be recycled from an existing restoring task yet to be 
returned from the state updater.
-    //
-    // When that occurs we stash these pending tasks until either they are 
finally clear to be created,
-    // or they are revoked from a new assignment.
     private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate 
= new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate 
= new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new 
HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> 
pendingTasksToUpdateInputPartitions = new HashMap<>();
     private final Set<Task> pendingTasksToInit = new HashSet<>();
     private final Set<TaskId> pendingTasksToClose = new HashSet<>();
 
+    // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
     private final Map<TopicPartition, Task> activeTasksPerPartition = new 
HashMap<>();
 
     Tasks(final LogContext logContext) {
         this.log = logContext.logger(getClass());
     }
 
-    void purgePendingTasksToCreate(final Set<TaskId> assignedActiveTasks, 
final Set<TaskId> assignedStandbyTasks) {
-        pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks);
-        pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks);
+    void purgePendingTasksToCreate() {
+        pendingActiveTasksToCreate.clear();
+        pendingStandbyTasksToCreate.clear();
+    }
+
+    Map<TaskId, Set<TopicPartition>> 
drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) {

Review Comment:
   Ack.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -43,35 +43,48 @@
 class Tasks {
     private final Logger log;
 
+    // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
     private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
 
     // Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
     // these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
     // we receive a new assignment and they are revoked from the thread.
-
-    // Tasks may have been assigned but not yet created because:
-    // 1. They are for a NamedTopology that is yet known by this host.
-    // 2. They are to be recycled from an existing restoring task yet to be 
returned from the state updater.
-    //
-    // When that occurs we stash these pending tasks until either they are 
finally clear to be created,
-    // or they are revoked from a new assignment.
     private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate 
= new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate 
= new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new 
HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> 
pendingTasksToUpdateInputPartitions = new HashMap<>();
     private final Set<Task> pendingTasksToInit = new HashSet<>();
     private final Set<TaskId> pendingTasksToClose = new HashSet<>();
 
+    // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
     private final Map<TopicPartition, Task> activeTasksPerPartition = new 
HashMap<>();
 
     Tasks(final LogContext logContext) {
         this.log = logContext.logger(getClass());
     }
 
-    void purgePendingTasksToCreate(final Set<TaskId> assignedActiveTasks, 
final Set<TaskId> assignedStandbyTasks) {
-        pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks);
-        pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks);
+    void purgePendingTasksToCreate() {
+        pendingActiveTasksToCreate.clear();
+        pendingStandbyTasksToCreate.clear();
+    }
+
+    Map<TaskId, Set<TopicPartition>> 
drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) {
+        final Map<TaskId, Set<TopicPartition>> pendingActiveTasksForTopologies 
=
+            filterMap(pendingActiveTasksToCreate, t -> 
currentTopologies.contains(t.getKey().topologyName()));
+
+        
pendingActiveTasksToCreate.keySet().removeAll(pendingActiveTasksForTopologies.keySet());
+
+        return pendingActiveTasksForTopologies;
+    }
+
+    Map<TaskId, Set<TopicPartition>> pendingStandbyTasksForTopologies(final 
Set<String> currentTopologies) {

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to