guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682871868



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -172,7 +172,7 @@ public void removeNamedTopology(final String 
topologyToRemove) {
      */
     public void cleanUpNamedTopology(final String name) {
         if (getTopologyByName(name).isPresent()) {
-            throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology");
+            throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology: ");

Review comment:
       The topology was not included? :)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
         }
     }
 
+    /**
+     * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+     */
+    void handleTopologyUpdates() {
+        tasks.maybeCreateTasksFromNewTopologies();
+        for (final Task task : activeTaskIterable()) {
+            if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+                task.freezeProcessing();

Review comment:
       Not sure I understand this logic.. could you explain a bit? Also there's 
no unfrozen function so once a task is frozen it would stick with that state 
forever?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -84,6 +86,15 @@ void setMainConsumer(final Consumer<byte[], byte[]> 
mainConsumer) {
         this.mainConsumer = mainConsumer;
     }
 
+    void handleNewAssignmentAndCreateTasks(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
+                                           final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
+                                           final Set<TaskId> 
assignedActiveTasks,
+                                           final Set<TaskId> 
assignedStandbyTasks) {
+        activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, 
assignedActiveTasks, activeTasksToCreate.keySet()));

Review comment:
       The diff between `assignedActiveTasks` and `activeTasksToCreate` are 
those tasks that are already owned and hence do not need to be assigned, am I 
reading that right (ditto for standbys)? Why these should be considered as 
remove revoked?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to