guozhangwang commented on a change in pull request #11857: URL: https://github.com/apache/kafka/pull/11857#discussion_r820993077
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -161,35 +161,47 @@ public void registerThread(final String threadName) { public void unregisterThread(final String threadName) { threadVersions.remove(threadName); - maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName); + maybeNotifyTopologyVersionListeners(); } public TaskExecutionMetadata taskExecutionMetadata() { return taskExecutionMetadata; } - public void maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String threadName) { + public Set<String> updateThreadTopologyVersion(final String threadName) { try { - lock(); - final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); - TopologyVersionWaiters topologyVersionWaiters; + version.topologyLock.lock(); threadVersions.put(threadName, topologyVersion()); + return namedTopologiesView(); + } finally { + version.topologyLock.unlock(); + } + } + + public void maybeNotifyTopologyVersionListeners() { + try { + lock(); + final long minThreadVersion = getMinimumThreadVersion(); + final Iterator<TopologyVersionListener> iterator = version.activeTopologyUpdateListeners.listIterator(); + TopologyVersionListener topologyVersionListener; while (iterator.hasNext()) { - topologyVersionWaiters = iterator.next(); - final long topologyVersionWaitersVersion = topologyVersionWaiters.topologyVersion; - if (topologyVersionWaitersVersion <= threadVersions.get(threadName)) { - if (threadVersions.values().stream().allMatch(t -> t >= topologyVersionWaitersVersion)) { - topologyVersionWaiters.future.complete(null); - iterator.remove(); - log.info("All threads are now on topology version {}", topologyVersionWaiters.topologyVersion); - } + topologyVersionListener = iterator.next(); + final long topologyVersionWaitersVersion = topologyVersionListener.topologyVersion; + if (minThreadVersion >= topologyVersionWaitersVersion) { Review comment: Hmm... just to make sure we are talking about `version.activeTopologyUpdateListeners` right? These listeners are for the calling thread of the `removeNamedTopology / addNamedTopology / start`, which would get the wraped futures these listeners are constructed on. Anyways, my understanding is that when a thread is removed, the `getMinimumThreadVersion` returned version would not take that removed thread into consideration, so that even the removed thread's version is low it would not block the future being completed. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -161,35 +161,47 @@ public void registerThread(final String threadName) { public void unregisterThread(final String threadName) { threadVersions.remove(threadName); - maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName); + maybeNotifyTopologyVersionListeners(); } public TaskExecutionMetadata taskExecutionMetadata() { return taskExecutionMetadata; } - public void maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String threadName) { + public Set<String> updateThreadTopologyVersion(final String threadName) { try { - lock(); - final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); - TopologyVersionWaiters topologyVersionWaiters; + version.topologyLock.lock(); threadVersions.put(threadName, topologyVersion()); + return namedTopologiesView(); + } finally { + version.topologyLock.unlock(); + } + } + + public void maybeNotifyTopologyVersionListeners() { + try { + lock(); + final long minThreadVersion = getMinimumThreadVersion(); + final Iterator<TopologyVersionListener> iterator = version.activeTopologyUpdateListeners.listIterator(); + TopologyVersionListener topologyVersionListener; Review comment: nit: this `TopologyVersionListener topologyVersionListener` could be declared within the while loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org