guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683823753



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
-    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+    private final TopologyVersion version;
+
+    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability

Review comment:
       SGTM.
   
   What about the other comment, i.e. moving the `Map<String, 
InternalTopologyBuilder> builders` into the `TopologyVersion` itself? Besides 
the constructors, the only modifiers to `builders` seem to be 
`register/deregister`, in which we would always try to `getAndIncrement` 
version. So what about consolidating the modification of builders along with 
version bump, and hence we would not need to use a `ConcurrentNavigableMap`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
         }
     }
 
+    /**
+     * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+     */
+    void handleTopologyUpdates() {
+        tasks.maybeCreateTasksFromNewTopologies();
+        for (final Task task : activeTaskIterable()) {
+            if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+                task.freezeProcessing();

Review comment:
       Just another thought: if this is just for the very short temporary phase 
between when the topology is removed to when the rebalance is finally triggered 
to remove the tasks (which should usually be in the next poll), could we just 
call `task.suspend` instead of adding the new `freeze` logic? When we finally 
close the tasks we can still transit from suspended to closed?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -719,6 +723,8 @@ void runOnce() {
 
         final long pollLatency = pollPhase();
 
+        topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);

Review comment:
       How about moving this ahead of `pollPhase()`? We are likely to be kicked 
out of the group while blocked waiting here, so it's better to be aware of that 
and re-join the group immediately, rather than doing the restore/etc still 
which may be all wasted work.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -867,7 +873,25 @@ private void initializeAndRestorePhase() {
         log.debug("Idempotent restore call done. Thread state has not 
changed.");
     }
 
+    // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
+    private void checkForTopologyUpdates() {
+        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {
+            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+            taskManager.handleTopologyUpdates();
+
+            log.info("StreamThread has detected an update to the topology, 
triggering a rebalance to refresh the assignment");

Review comment:
       very nit: just add a TODO that we can improve this case to not always 
enforce rebalance when version bumped, in case we forgot in future PRs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to