wcarlson5 commented on a change in pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#discussion_r597208785



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -971,6 +972,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
             synchronized (changeThreadCount) {
                 threadIdx = getNextThreadIndex();
                 cacheSizePerThread = 
getCacheSizePerThread(getNumLiveStreamThreads() + 1);
+                log.info("Adding a new StreamThread with thread id {}; new 
cache size per thread is {}", threadIdx, cacheSizePerThread);

Review comment:
       Maybe log number of threads here too?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
         return Optional.empty();
     }
 
-    // Returns the number of threads that are not in the DEAD state -- use 
this over threads.size()
+    // Returns the number of threads that are not in the DEAD or 
PENDING_SHUTDOWN state -- use this over threads.size()
     private int getNumLiveStreamThreads() {
         final AtomicInteger numLiveThreads = new AtomicInteger(0);
         synchronized (threads) {
             processStreamThread(thread -> {
                 if (thread.state() == StreamThread.State.DEAD) {
+                    log.debug("Trimming thread {} from the threads list since 
it's state is {}", thread.getName(), StreamThread.State.DEAD);
                     threads.remove(thread);
+                } else if (thread.state() == 
StreamThread.State.PENDING_SHUTDOWN) {

Review comment:
       are we risking a memory overflow with this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to