ableegoldman commented on a change in pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#discussion_r597215093



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
         return Optional.empty();
     }
 
-    // Returns the number of threads that are not in the DEAD state -- use 
this over threads.size()
+    // Returns the number of threads that are not in the DEAD or 
PENDING_SHUTDOWN state -- use this over threads.size()
     private int getNumLiveStreamThreads() {
         final AtomicInteger numLiveThreads = new AtomicInteger(0);
         synchronized (threads) {
             processStreamThread(thread -> {
                 if (thread.state() == StreamThread.State.DEAD) {
+                    log.debug("Trimming thread {} from the threads list since 
it's state is {}", thread.getName(), StreamThread.State.DEAD);
                     threads.remove(thread);
+                } else if (thread.state() == 
StreamThread.State.PENDING_SHUTDOWN) {

Review comment:
       Yep -- but imo this should be pretty rare. To hit an OOM you would need 
to have a relatively huge cache, and few total threads (so that the cache size 
per thread is still large), and then you'd need the new thread to start up, 
rebalance to get tasks, finish initializing and restoring them and then start 
consuming enough data to fill up the cache all before the old thread gets 
around to closing it's state stores. 
   Sure, threads can sometimes hang during termination but in my experience 
that's usually after the store closure (and hopefully hanging is itself rare).
   
   My point is, this should be good enough for now, and I'd like to stick with 
the simplest solution this late in the game.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to