mjsax commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533627143



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client 
may be shared among threads
         adminClient = 
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new 
HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new 
ArrayList<>();
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
         for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
+            createStreamThread(cacheSizePerThread, i + 1);
         }
 
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
             Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
 
-        final StreamStateListener streamStateListener = new 
StreamStateListener(threadState, globalThreadState);
+        streamStateListener = new StreamStateListener(threadState, 
globalThreadState);

Review comment:
       Maybe we had some cyclic dependency at some point in the past? Not sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to