guozhangwang commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925882848


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -34,29 +33,40 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) 
and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring 
active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
     private final Logger log;
-    private final TopologyMetadata topologyMetadata;
-
-    private final Map<TaskId, Task> allTasksPerId = 
Collections.synchronizedSortedMap(new TreeMap<>());
-    private final Map<TaskId, Task> readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
-    private final Collection<Task> readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
 
     // TODO: change type to `StreamTask`
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
+    // TODO: change type to `StandbyTask`
+    private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
+
+    // Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
+    // these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
+    // we receive a new assignment and they are revoked from the thread.
+
+    // Tasks may have been assigned but not yet created because:
+    // 1. They are for a NamedTopology that is yet known by this host.
+    // 2. They are to be recycled from an existing restoring task yet to be 
returned from the task updater.

Review Comment:
   Oops, my bad :) thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to