janchilling commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2021650402
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
streamsMetrics.metricsRegistry().addReporter(reporter);
final StreamThread streamThread = new StreamThread(
- time,
- config,
- adminClient,
- mainConsumer,
- restoreConsumer,
- changelogReader,
- originalReset,
- taskManager,
- stateUpdater,
- streamsMetrics,
- topologyMetadata,
- processId,
- threadId,
- logContext,
- referenceContainer.assignmentErrorCode,
- referenceContainer.nextScheduledRebalanceMs,
- referenceContainer.nonFatalExceptionsToHandle,
- shutdownErrorHook,
- streamsUncaughtExceptionHandler,
- cache::resize
+ time,
+ config,
+ adminClient,
+ mainConsumer,
+ restoreConsumer,
+ changelogReader,
+ originalReset,
+ taskManager,
+ stateUpdater,
+ streamsMetrics,
+ topologyMetadata,
+ processId,
+ threadId,
+ logContext,
+ referenceContainer.assignmentErrorCode,
+ referenceContainer.nextScheduledRebalanceMs,
+ referenceContainer.nonFatalExceptionsToHandle,
+ shutdownErrorHook,
+ streamsUncaughtExceptionHandler,
+ cache::resize
);
return streamThread.updateThreadMetadata(adminClientId(clientId));
}
private static DefaultTaskManager maybeCreateSchedulingTaskManager(final
boolean processingThreadsEnabled,
- final
boolean stateUpdaterEnabled,
final
TopologyMetadata topologyMetadata,
final
Time time,
final
String threadId,
final
Tasks tasks) {
if (processingThreadsEnabled) {
- if (!stateUpdaterEnabled) {
- throw new IllegalStateException("Processing threads require
the state updater to be enabled");
- }
final DefaultTaskManager defaultTaskManager = new
DefaultTaskManager(
- time,
- threadId,
- tasks,
- new DefaultTaskExecutorCreator(),
- topologyMetadata.taskExecutionMetadata(),
- 1
+ time,
+ threadId,
+ tasks,
+ new DefaultTaskExecutorCreator(),
+ topologyMetadata.taskExecutionMetadata(),
+ 1
);
defaultTaskManager.startTaskExecutors();
return defaultTaskManager;
}
return null;
}
- private static StateUpdater maybeCreateAndStartStateUpdater(final boolean
stateUpdaterEnabled,
- final
StreamsMetricsImpl streamsMetrics,
+ private static StateUpdater maybeCreateAndStartStateUpdater(final
StreamsMetricsImpl streamsMetrics,
Review Comment:
Updated the method name!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]