guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682861544
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ########## @@ -64,6 +65,9 @@ private final Map<TaskId, StreamsProducer> taskProducers; private final StreamThread.ProcessingMode processingMode; + // tasks may be assigned for a NamedTopology that is not yet known by this host, and saved for later creation + private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated = new HashMap<>(); Review comment: Yeah I left this comment before syncing with @ableegoldman offline, so it's better to clarify for others reading this PR :) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -172,7 +172,7 @@ public void removeNamedTopology(final String topologyToRemove) { */ public void cleanUpNamedTopology(final String name) { if (getTopologyByName(name).isPresent()) { - throw new IllegalStateException("Can't clean up local state for an active NamedTopology"); + throw new IllegalStateException("Can't clean up local state for an active NamedTopology: "); Review comment: The topology was not included? :) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } + /** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ + void handleTopologyUpdates() { + tasks.maybeCreateTasksFromNewTopologies(); + for (final Task task : activeTaskIterable()) { + if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { + task.freezeProcessing(); Review comment: Not sure I understand this logic.. could you explain a bit? Also there's no unfrozen function so once a task is frozen it would stick with that state forever? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ########## @@ -84,6 +86,15 @@ void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) { this.mainConsumer = mainConsumer; } + void handleNewAssignmentAndCreateTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, + final Set<TaskId> assignedActiveTasks, + final Set<TaskId> assignedStandbyTasks) { + activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, assignedActiveTasks, activeTasksToCreate.keySet())); Review comment: The diff between `assignedActiveTasks` and `activeTasksToCreate` are those tasks that are already owned and hence do not need to be assigned, am I reading that right (ditto for standbys)? Why these should be considered as remove revoked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org