ableegoldman commented on a change in pull request #11609: URL: https://github.com/apache/kafka/pull/11609#discussion_r791115535
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -1140,6 +1140,11 @@ private void completeShutdown(final boolean cleanRun) { topologyMetadata.unregisterThread(threadMetadata.threadName()); + try { + topologyMetadata.unregisterThread(threadMetadata.threadName()); Review comment: Oh good catch, I guess that got missed since I ported this fix from [#11601](https://github.com/apache/kafka/pull/11601/) (since that doesn't need to make 0.24) -- fixed ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -138,6 +138,7 @@ public void registerThread(final String threadName) { public void unregisterThread(final String threadName) { threadVersions.remove(threadName); + maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName); Review comment: Yeah, sorry I should have left an explanation for this -- basically if we have `#add/removeNamedTopology` futures blocked on this one thread ack'ing the topology update, and that update occurs between when the thread checks for these updates and when it checks for/goes into shutdown, then we need to update the waiters and potentially complete any that have just been waiting on this thread which has missed the update before shutting down ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -224,6 +225,7 @@ public void maybeWaitForNonEmptyTopology(final Supplier<StreamThread.State> thre } finally { unlock(); } + log.info("Removed NamedTopology {} and updated topology version to {}", topologyName, version.topologyVersion.get()); Review comment: There's a semantic difference but it's hard to notice, the first one says "Remov_ing_" when we start removing it, whereas this one says "Remov_ed_" to indicate that we have finished. That said it's probably worth clarifying further, eg `"Finished removing..."` -- I'll update it to make the purpose of each line more clear ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -319,6 +315,14 @@ private void verifyTopologyStateStore(final String topologyName, final String st return streamsMetadataState.getAllMetadataForStore(storeName, topologyName); } + /** + * See {@link KafkaStreams#metadataForAllStreamsClients()} + */ + public Collection<StreamsMetadata> allStreamsClientsMetadataForTopology(final String topologyName) { Review comment: Fair, I was thinking of them as semantically different since for example in this case we are changing the meaning of the original method to return only a subset of metadata. Whereas for something like `#store`, I felt it made sense to keep the same name since the meaning of this API doesn't change when using named topologies -- you just happen to need one additional parameter in order to uniquely identify a state store since the name alone could point to multiple stores from different topologies. But it's definitely something to discuss when we get to the KIP proposal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org