ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r799261397



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1034,14 +1037,15 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
             final StreamThread streamThread;
             synchronized (changeThreadCount) {
                 final int threadIdx = getNextThreadIndex();
-                final int numLiveThreads = getNumLiveStreamThreads();
-                final long cacheSizePerThread = 
getCacheSizePerThread(numLiveThreads + 1);
-                log.info("Adding StreamThread-{}, there will now be {} live 
threads and the new cache size per thread is {}",
-                         threadIdx, numLiveThreads + 1, cacheSizePerThread);
-                resizeThreadCache(cacheSizePerThread);
                 // Creating thread should hold the lock in order to avoid 
duplicate thread index.
                 // If the duplicate index happen, the metadata of thread may 
be duplicate too.
-                streamThread = createAndAddStreamThread(cacheSizePerThread, 
threadIdx);
+                // Also, we create the new thread with initial values of cache 
size and max buffer size as 0
+                // and then resize them later
+                streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
+                final int numLiveThreads = getNumLiveStreamThreads();
+                resizeThreadCacheAndBufferMemory(numLiveThreads + 1);

Review comment:
       One more thing -- @wcarlson5 mentioned there was an off-by-one error 
here due to the re-ordering of these calls. Specifically, the `+ 1` in this 
line was necessary before now because we called `resizeThreadCache` before 
actually adding the new thread, so we had to account for the new thread by 
adding one. But since we now create/add the new thread first, the 
`getNumLiveStreamThreads` method will actually return the correct number of 
threads, so we don't need the `+ 1` anymore.
   
   On that note, I take it we reordered these calls because we now create the 
thread without the cache value and then call `resize` to set the cache after 
the thread has already been created. I was wondering: why do we need to do this 
post-construction resizing? I only looked at this part of the PR briefly, but 
it seems to me like we always have the actual cache size known when we're 
creating the thread, so can't we just pass that in to the StreamThread#create 
method/constructor? It's just a bit confusing to initialize the cache size to 
some random value, it took me a little while to figure out what was going on 
with that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to