guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682861544



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -64,6 +65,9 @@
     private final Map<TaskId, StreamsProducer> taskProducers;
     private final StreamThread.ProcessingMode processingMode;
 
+    // tasks may be assigned for a NamedTopology that is not yet known by this 
host, and saved for later creation
+    private final Map<TaskId, Set<TopicPartition>>  unknownTasksToBeCreated = 
new HashMap<>();

Review comment:
       Yeah I left this comment before syncing with @ableegoldman offline, so 
it's better to clarify for others reading this PR :)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -172,7 +172,7 @@ public void removeNamedTopology(final String 
topologyToRemove) {
      */
     public void cleanUpNamedTopology(final String name) {
         if (getTopologyByName(name).isPresent()) {
-            throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology");
+            throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology: ");

Review comment:
       The topology was not included? :)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
         }
     }
 
+    /**
+     * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+     */
+    void handleTopologyUpdates() {
+        tasks.maybeCreateTasksFromNewTopologies();
+        for (final Task task : activeTaskIterable()) {
+            if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+                task.freezeProcessing();

Review comment:
       Not sure I understand this logic.. could you explain a bit? Also there's 
no unfrozen function so once a task is frozen it would stick with that state 
forever?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -84,6 +86,15 @@ void setMainConsumer(final Consumer<byte[], byte[]> 
mainConsumer) {
         this.mainConsumer = mainConsumer;
     }
 
+    void handleNewAssignmentAndCreateTasks(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
+                                           final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
+                                           final Set<TaskId> 
assignedActiveTasks,
+                                           final Set<TaskId> 
assignedStandbyTasks) {
+        activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, 
assignedActiveTasks, activeTasksToCreate.keySet()));

Review comment:
       The diff between `assignedActiveTasks` and `activeTasksToCreate` are 
those tasks that are already owned and hence do not need to be assigned, am I 
reading that right (ditto for standbys)? Why these should be considered as 
remove revoked?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to