ableegoldman commented on a change in pull request #11609:
URL: https://github.com/apache/kafka/pull/11609#discussion_r791121073



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -138,6 +138,7 @@ public void registerThread(final String threadName) {
 
     public void unregisterThread(final String threadName) {
         threadVersions.remove(threadName);
+        
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);

Review comment:
       Yeah, sorry I should have left an explanation for this -- basically if 
we have `#add/removeNamedTopology` futures blocked on this one thread ack'ing 
the topology update, and that update occurs between when the thread checks for 
these updates and when it checks for/goes into shutdown, then we need to update 
the waiters and potentially complete any that have just been waiting on this 
thread which has missed the update before shutting down




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to