guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683823753
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; - private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability + private final TopologyVersion version; + + private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability Review comment: SGTM. What about the other comment, i.e. moving the `Map<String, InternalTopologyBuilder> builders` into the `TopologyVersion` itself? Besides the constructors, the only modifiers to `builders` seem to be `register/deregister`, in which we would always try to `getAndIncrement` version. So what about consolidating the modification of builders along with version bump, and hence we would not need to use a `ConcurrentNavigableMap`? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } + /** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ + void handleTopologyUpdates() { + tasks.maybeCreateTasksFromNewTopologies(); + for (final Task task : activeTaskIterable()) { + if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { + task.freezeProcessing(); Review comment: Just another thought: if this is just for the very short temporary phase between when the topology is removed to when the rebalance is finally triggered to remove the tasks (which should usually be in the next poll), could we just call `task.suspend` instead of adding the new `freeze` logic? When we finally close the tasks we can still transit from suspended to closed? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -719,6 +723,8 @@ void runOnce() { final long pollLatency = pollPhase(); + topologyMetadata.maybeWaitForNonEmptyTopology(() -> state); Review comment: How about moving this ahead of `pollPhase()`? We are likely to be kicked out of the group while blocked waiting here, so it's better to be aware of that and re-join the group immediately, rather than doing the restore/etc still which may be all wasted work. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -867,7 +873,25 @@ private void initializeAndRestorePhase() { log.debug("Idempotent restore call done. Thread state has not changed."); } + // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology + private void checkForTopologyUpdates() { + if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) { + lastSeenTopologyVersion = topologyMetadata.topologyVersion(); + taskManager.handleTopologyUpdates(); + + log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment"); Review comment: very nit: just add a TODO that we can improve this case to not always enforce rebalance when version bumped, in case we forgot in future PRs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org