cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553988171



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1432,7 +1480,9 @@ public void cleanUp() {
         validateIsRunningOrRebalancing();
         final Set<ThreadMetadata> threadMetadata = new HashSet<>();
         for (final StreamThread thread : threads) {
-            threadMetadata.add(thread.threadMetadata());
+            if (thread.state() != StreamThread.State.DEAD) {
+                threadMetadata.add(thread.threadMetadata());
+            }
         }
         return threadMetadata;

Review comment:
       Ah yes, you are right! The stream thread might be replaced or just 
normally shut down which would not be synchronized on the `changeThreadCount` 
lock. However, we still do not guarantee that the state is correct when the 
method returns, because the state could change after the lock is released but 
before the method returns. At this point with or without lock it doesn't 
matter. Either we find something that synchronizes the whole method or we can 
also remove the synchronisation on the stream thread state. And also if we find 
something that synchronizes the whole method, I am not sure if this guarantee 
is worth the hassle. WDYT? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to