mjsax commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533627143
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, time, globalThreadId, delegatingStateRestoreListener, - this::defaultStreamsUncaughtExceptionHandler + streamsUncaughtExceptionHandler ); globalThreadState = globalStreamThread.state(); } // use client id instead of thread client id since this admin client may be shared among threads adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); - final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads); - final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>(); + threadState = new HashMap<>(numStreamThreads); + storeProviders = new ArrayList<>(); for (int i = 0; i < numStreamThreads; i++) { - final StreamThread streamThread = StreamThread.create( - internalTopologyBuilder, - config, - clientSupplier, - adminClient, - processId, - clientId, - streamsMetrics, - time, - streamsMetadataState, - cacheSizePerThread, - stateDirectory, - delegatingStateRestoreListener, - i + 1, - KafkaStreams.this::closeToError, - this::defaultStreamsUncaughtExceptionHandler - ); - threads.add(streamThread); - threadState.put(streamThread.getId(), streamThread.state()); - storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); + createStreamThread(cacheSizePerThread, i + 1); } ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); - final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState); + streamStateListener = new StreamStateListener(threadState, globalThreadState); Review comment: Maybe we had some cyclic dependency at some point in the past? Not sure. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org