ableegoldman commented on a change in pull request #11609:
URL: https://github.com/apache/kafka/pull/11609#discussion_r791115535



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1140,6 +1140,11 @@ private void completeShutdown(final boolean cleanRun) {
 
         topologyMetadata.unregisterThread(threadMetadata.threadName());
 
+        try {
+            topologyMetadata.unregisterThread(threadMetadata.threadName());

Review comment:
       Oh good catch, I guess that got missed since I ported this fix from 
[#11601](https://github.com/apache/kafka/pull/11601/) (since that doesn't need 
to make 0.24) -- fixed

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -138,6 +138,7 @@ public void registerThread(final String threadName) {
 
     public void unregisterThread(final String threadName) {
         threadVersions.remove(threadName);
+        
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);

Review comment:
       Yeah, sorry I should have left an explanation for this -- basically if 
we have `#add/removeNamedTopology` futures blocked on this one thread ack'ing 
the topology update, and that update occurs between when the thread checks for 
these updates and when it checks for/goes into shutdown, then we need to update 
the waiters and potentially complete any that have just been waiting on this 
thread which has missed the update before shutting down

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -224,6 +225,7 @@ public void maybeWaitForNonEmptyTopology(final 
Supplier<StreamThread.State> thre
         } finally {
             unlock();
         }
+        log.info("Removed NamedTopology {} and updated topology version to 
{}", topologyName, version.topologyVersion.get());

Review comment:
       There's a semantic difference but it's hard to notice, the first one 
says "Remov_ing_" when we start removing it, whereas this one says "Remov_ed_" 
to indicate that we have finished. That said it's probably worth clarifying 
further, eg `"Finished removing..."` -- I'll update it to make the purpose of 
each line more clear

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -319,6 +315,14 @@ private void verifyTopologyStateStore(final String 
topologyName, final String st
         return streamsMetadataState.getAllMetadataForStore(storeName, 
topologyName);
     }
 
+    /**
+     * See {@link KafkaStreams#metadataForAllStreamsClients()}
+     */
+    public Collection<StreamsMetadata> 
allStreamsClientsMetadataForTopology(final String topologyName) {

Review comment:
       Fair, I was thinking of them as semantically different since for example 
in this case we are changing the meaning of the original method to return only 
a subset of metadata. Whereas for something like `#store`, I felt it made sense 
to keep the same name since the meaning of this API doesn't change when using 
named topologies -- you just happen to need one additional parameter in order 
to uniquely identify a state store since the name alone could point to multiple 
stores from different topologies. But it's definitely something to discuss when 
we get to the KIP proposal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to