guozhangwang commented on a change in pull request #11857:
URL: https://github.com/apache/kafka/pull/11857#discussion_r820993077



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -161,35 +161,47 @@ public void registerThread(final String threadName) {
 
     public void unregisterThread(final String threadName) {
         threadVersions.remove(threadName);
-        
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
+        maybeNotifyTopologyVersionListeners();
     }
 
     public TaskExecutionMetadata taskExecutionMetadata() {
         return taskExecutionMetadata;
     }
 
-    public void 
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String 
threadName) {
+    public Set<String> updateThreadTopologyVersion(final String threadName) {
         try {
-            lock();
-            final Iterator<TopologyVersionWaiters> iterator = 
version.activeTopologyWaiters.listIterator();
-            TopologyVersionWaiters topologyVersionWaiters;
+            version.topologyLock.lock();
             threadVersions.put(threadName, topologyVersion());
+            return namedTopologiesView();
+        } finally {
+            version.topologyLock.unlock();
+        }
+    }
+
+    public void maybeNotifyTopologyVersionListeners() {
+        try {
+            lock();
+            final long minThreadVersion = getMinimumThreadVersion();
+            final Iterator<TopologyVersionListener> iterator = 
version.activeTopologyUpdateListeners.listIterator();
+            TopologyVersionListener topologyVersionListener;
             while (iterator.hasNext()) {
-                topologyVersionWaiters = iterator.next();
-                final long topologyVersionWaitersVersion = 
topologyVersionWaiters.topologyVersion;
-                if (topologyVersionWaitersVersion <= 
threadVersions.get(threadName)) {
-                    if (threadVersions.values().stream().allMatch(t -> t >= 
topologyVersionWaitersVersion)) {
-                        topologyVersionWaiters.future.complete(null);
-                        iterator.remove();
-                        log.info("All threads are now on topology version {}", 
topologyVersionWaiters.topologyVersion);
-                    }
+                topologyVersionListener = iterator.next();
+                final long topologyVersionWaitersVersion = 
topologyVersionListener.topologyVersion;
+                if (minThreadVersion >= topologyVersionWaitersVersion) {

Review comment:
       Hmm... just to make sure we are talking about 
`version.activeTopologyUpdateListeners` right? These listeners are for the 
calling thread of the `removeNamedTopology / addNamedTopology / start`, which 
would get the wraped futures these listeners are constructed on.
   
   Anyways, my understanding is that when a thread is removed, the 
`getMinimumThreadVersion` returned version would not take that removed thread 
into consideration, so that even the removed thread's version is low it would 
not block the future being completed.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -161,35 +161,47 @@ public void registerThread(final String threadName) {
 
     public void unregisterThread(final String threadName) {
         threadVersions.remove(threadName);
-        
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
+        maybeNotifyTopologyVersionListeners();
     }
 
     public TaskExecutionMetadata taskExecutionMetadata() {
         return taskExecutionMetadata;
     }
 
-    public void 
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String 
threadName) {
+    public Set<String> updateThreadTopologyVersion(final String threadName) {
         try {
-            lock();
-            final Iterator<TopologyVersionWaiters> iterator = 
version.activeTopologyWaiters.listIterator();
-            TopologyVersionWaiters topologyVersionWaiters;
+            version.topologyLock.lock();
             threadVersions.put(threadName, topologyVersion());
+            return namedTopologiesView();
+        } finally {
+            version.topologyLock.unlock();
+        }
+    }
+
+    public void maybeNotifyTopologyVersionListeners() {
+        try {
+            lock();
+            final long minThreadVersion = getMinimumThreadVersion();
+            final Iterator<TopologyVersionListener> iterator = 
version.activeTopologyUpdateListeners.listIterator();
+            TopologyVersionListener topologyVersionListener;

Review comment:
       nit: this `TopologyVersionListener topologyVersionListener` could be 
declared within the while loop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to