cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2016429245
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean>
streamsUncaughtExceptionHandler) {
- final boolean stateUpdaterEnabled =
InternalConfig.stateUpdaterEnabled(config.originals());
Review Comment:
Could you please also remove the internal config `_state.updater.enabled_`
and all corresponding code from `StreamsConfig`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -237,13 +237,13 @@ State setState(final State newState) {
if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
log.debug("Ignoring request to transit from PENDING_SHUTDOWN
to {}: " +
- "only DEAD state is a valid next state",
newState);
+ "only DEAD state is a valid next state", newState);
// when the state is already in PENDING_SHUTDOWN, all other
transitions will be
// refused but we do not throw exception here
return null;
} else if (state == State.DEAD) {
log.debug("Ignoring request to transit from DEAD to {}: " +
- "no valid next state after DEAD", newState);
+ "no valid next state after DEAD", newState);
Review Comment:
Could you revert this change, please?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Consumer<byte[], byte[]> restoreConsumer =
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
- time,
- config,
- restorationLogContext,
- adminClient,
- restoreConsumer,
- userStateRestoreListener,
- userStandbyUpdateListener
+ time,
+ config,
+ restorationLogContext,
+ adminClient,
+ restoreConsumer,
+ userStateRestoreListener,
+ userStandbyUpdateListener
);
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes,
streamsMetrics);
final boolean proceessingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- cache,
- time,
- clientSupplier,
- threadId,
- threadIdx,
- processId,
- log,
- stateUpdaterEnabled,
- proceessingThreadsEnabled
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ cache,
+ time,
+ clientSupplier,
+ threadId,
+ threadIdx,
+ processId,
+ log,
+ proceessingThreadsEnabled
);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- threadId,
- log,
- stateUpdaterEnabled);
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ threadId,
+ log);
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Consumer<byte[], byte[]> restoreConsumer =
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
- time,
- config,
- restorationLogContext,
- adminClient,
- restoreConsumer,
- userStateRestoreListener,
- userStandbyUpdateListener
+ time,
+ config,
+ restorationLogContext,
+ adminClient,
+ restoreConsumer,
+ userStateRestoreListener,
+ userStandbyUpdateListener
);
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes,
streamsMetrics);
final boolean proceessingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- cache,
- time,
- clientSupplier,
- threadId,
- threadIdx,
- processId,
- log,
- stateUpdaterEnabled,
- proceessingThreadsEnabled
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ cache,
+ time,
+ clientSupplier,
+ threadId,
+ threadIdx,
+ processId,
+ log,
+ proceessingThreadsEnabled
);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- threadId,
- log,
- stateUpdaterEnabled);
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ threadId,
+ log);
final Tasks tasks = new Tasks(new LogContext(logPrefix));
final boolean processingThreadsEnabled =
- InternalConfig.processingThreadsEnabled(config.originals());
+ InternalConfig.processingThreadsEnabled(config.originals());
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -615,27 +601,27 @@ public StreamThread(final Time time,
ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
ThreadMetrics.addThreadStartTimeMetric(
- threadId,
- streamsMetrics,
- time.milliseconds()
+ threadId,
+ streamsMetrics,
+ time.milliseconds()
);
ThreadMetrics.addThreadStateTelemetryMetric(
- processId.toString(),
- threadId,
- streamsMetrics,
- (metricConfig, now) -> this.state().ordinal());
+ processId.toString(),
+ threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state().ordinal());
ThreadMetrics.addThreadStateMetric(
- threadId,
- streamsMetrics,
- (metricConfig, now) -> this.state());
+ threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state());
ThreadMetrics.addThreadBlockedTimeMetric(
- threadId,
- new StreamThreadTotalBlockedTime(
- mainConsumer,
- restoreConsumer,
- taskManager::totalProducerBlockedTime
- ),
- streamsMetrics
+ threadId,
+ new StreamThreadTotalBlockedTime(
+ mainConsumer,
+ restoreConsumer,
+ taskManager::totalProducerBlockedTime
+ ),
+ streamsMetrics
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -743,7 +728,7 @@ boolean runLoop() {
}
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks " + e.corruptedTasks()
+ " are corrupted. " +
- "Will close the task as dirty and re-create and
bootstrap from scratch.", e);
+ "Will close the task as dirty and re-create and
bootstrap from scratch.", e);
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() {
}
} else {
producerInstanceIdFuture.completeExceptionally(
- new TimeoutException("Could not retrieve thread
producer client instance id.")
+ new TimeoutException("Could not retrieve thread
producer client instance id.")
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() {
}
} else {
producerInstanceIdFuture.completeExceptionally(
- new TimeoutException("Could not retrieve thread
producer client instance id.")
+ new TimeoutException("Could not retrieve thread
producer client instance id.")
);
}
}
if (mainConsumerInstanceIdFuture.isDone()
- && (!stateUpdaterEnabled &&
restoreConsumerInstanceIdFuture.isDone())
- && producerInstanceIdFuture.isDone()) {
+ && restoreConsumerInstanceIdFuture.isDone()
+ && producerInstanceIdFuture.isDone()) {
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Consumer<byte[], byte[]> restoreConsumer =
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
- time,
- config,
- restorationLogContext,
- adminClient,
- restoreConsumer,
- userStateRestoreListener,
- userStandbyUpdateListener
+ time,
+ config,
+ restorationLogContext,
+ adminClient,
+ restoreConsumer,
+ userStateRestoreListener,
+ userStandbyUpdateListener
);
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes,
streamsMetrics);
final boolean proceessingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- cache,
- time,
- clientSupplier,
- threadId,
- threadIdx,
- processId,
- log,
- stateUpdaterEnabled,
- proceessingThreadsEnabled
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ cache,
+ time,
+ clientSupplier,
+ threadId,
+ threadIdx,
+ processId,
+ log,
+ proceessingThreadsEnabled
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -237,13 +237,13 @@ State setState(final State newState) {
if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
log.debug("Ignoring request to transit from PENDING_SHUTDOWN
to {}: " +
- "only DEAD state is a valid next state",
newState);
+ "only DEAD state is a valid next state", newState);
Review Comment:
Could you revert this change, please?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Consumer<byte[], byte[]> restoreConsumer =
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
- time,
- config,
- restorationLogContext,
- adminClient,
- restoreConsumer,
- userStateRestoreListener,
- userStandbyUpdateListener
+ time,
+ config,
+ restorationLogContext,
+ adminClient,
+ restoreConsumer,
+ userStateRestoreListener,
+ userStandbyUpdateListener
Review Comment:
Could you please revert this change?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Consumer<byte[], byte[]> restoreConsumer =
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
- time,
- config,
- restorationLogContext,
- adminClient,
- restoreConsumer,
- userStateRestoreListener,
- userStandbyUpdateListener
+ time,
+ config,
+ restorationLogContext,
+ adminClient,
+ restoreConsumer,
+ userStateRestoreListener,
+ userStandbyUpdateListener
);
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes,
streamsMetrics);
final boolean proceessingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- cache,
- time,
- clientSupplier,
- threadId,
- threadIdx,
- processId,
- log,
- stateUpdaterEnabled,
- proceessingThreadsEnabled
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ cache,
+ time,
+ clientSupplier,
+ threadId,
+ threadIdx,
+ processId,
+ log,
+ proceessingThreadsEnabled
);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- threadId,
- log,
- stateUpdaterEnabled);
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ threadId,
+ log);
final Tasks tasks = new Tasks(new LogContext(logPrefix));
final boolean processingThreadsEnabled =
- InternalConfig.processingThreadsEnabled(config.originals());
+ InternalConfig.processingThreadsEnabled(config.originals());
final DefaultTaskManager schedulingTaskManager =
- maybeCreateSchedulingTaskManager(processingThreadsEnabled,
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
+ maybeCreateSchedulingTaskManager(processingThreadsEnabled,
topologyMetadata, time, threadId, tasks);
final StateUpdater stateUpdater =
- maybeCreateAndStartStateUpdater(
- stateUpdaterEnabled,
- streamsMetrics,
- config,
- restoreConsumer,
- changelogReader,
- topologyMetadata,
- time,
- clientId,
- threadIdx
- );
+ maybeCreateAndStartStateUpdater(
+ streamsMetrics,
+ config,
+ restoreConsumer,
+ changelogReader,
+ topologyMetadata,
+ time,
+ clientId,
+ threadIdx
+ );
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
streamsMetrics.metricsRegistry().addReporter(reporter);
final StreamThread streamThread = new StreamThread(
- time,
- config,
- adminClient,
- mainConsumer,
- restoreConsumer,
- changelogReader,
- originalReset,
- taskManager,
- stateUpdater,
- streamsMetrics,
- topologyMetadata,
- processId,
- threadId,
- logContext,
- referenceContainer.assignmentErrorCode,
- referenceContainer.nextScheduledRebalanceMs,
- referenceContainer.nonFatalExceptionsToHandle,
- shutdownErrorHook,
- streamsUncaughtExceptionHandler,
- cache::resize
+ time,
+ config,
+ adminClient,
+ mainConsumer,
+ restoreConsumer,
+ changelogReader,
+ originalReset,
+ taskManager,
+ stateUpdater,
+ streamsMetrics,
+ topologyMetadata,
+ processId,
+ threadId,
+ logContext,
+ referenceContainer.assignmentErrorCode,
+ referenceContainer.nextScheduledRebalanceMs,
+ referenceContainer.nonFatalExceptionsToHandle,
+ shutdownErrorHook,
+ streamsUncaughtExceptionHandler,
+ cache::resize
);
return streamThread.updateThreadMetadata(adminClientId(clientId));
}
private static DefaultTaskManager maybeCreateSchedulingTaskManager(final
boolean processingThreadsEnabled,
- final
boolean stateUpdaterEnabled,
final
TopologyMetadata topologyMetadata,
final
Time time,
final
String threadId,
final
Tasks tasks) {
if (processingThreadsEnabled) {
- if (!stateUpdaterEnabled) {
- throw new IllegalStateException("Processing threads require
the state updater to be enabled");
- }
final DefaultTaskManager defaultTaskManager = new
DefaultTaskManager(
- time,
- threadId,
- tasks,
- new DefaultTaskExecutorCreator(),
- topologyMetadata.taskExecutionMetadata(),
- 1
+ time,
+ threadId,
+ tasks,
+ new DefaultTaskExecutorCreator(),
+ topologyMetadata.taskExecutionMetadata(),
+ 1
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
streamsMetrics.metricsRegistry().addReporter(reporter);
final StreamThread streamThread = new StreamThread(
- time,
- config,
- adminClient,
- mainConsumer,
- restoreConsumer,
- changelogReader,
- originalReset,
- taskManager,
- stateUpdater,
- streamsMetrics,
- topologyMetadata,
- processId,
- threadId,
- logContext,
- referenceContainer.assignmentErrorCode,
- referenceContainer.nextScheduledRebalanceMs,
- referenceContainer.nonFatalExceptionsToHandle,
- shutdownErrorHook,
- streamsUncaughtExceptionHandler,
- cache::resize
+ time,
+ config,
+ adminClient,
+ mainConsumer,
+ restoreConsumer,
+ changelogReader,
+ originalReset,
+ taskManager,
+ stateUpdater,
+ streamsMetrics,
+ topologyMetadata,
+ processId,
+ threadId,
+ logContext,
+ referenceContainer.assignmentErrorCode,
+ referenceContainer.nextScheduledRebalanceMs,
+ referenceContainer.nonFatalExceptionsToHandle,
+ shutdownErrorHook,
+ streamsUncaughtExceptionHandler,
+ cache::resize
);
return streamThread.updateThreadMetadata(adminClientId(clientId));
}
private static DefaultTaskManager maybeCreateSchedulingTaskManager(final
boolean processingThreadsEnabled,
- final
boolean stateUpdaterEnabled,
final
TopologyMetadata topologyMetadata,
final
Time time,
final
String threadId,
final
Tasks tasks) {
if (processingThreadsEnabled) {
- if (!stateUpdaterEnabled) {
- throw new IllegalStateException("Processing threads require
the state updater to be enabled");
- }
final DefaultTaskManager defaultTaskManager = new
DefaultTaskManager(
- time,
- threadId,
- tasks,
- new DefaultTaskExecutorCreator(),
- topologyMetadata.taskExecutionMetadata(),
- 1
+ time,
+ threadId,
+ tasks,
+ new DefaultTaskExecutorCreator(),
+ topologyMetadata.taskExecutionMetadata(),
+ 1
);
defaultTaskManager.startTaskExecutors();
return defaultTaskManager;
}
return null;
}
- private static StateUpdater maybeCreateAndStartStateUpdater(final boolean
stateUpdaterEnabled,
- final
StreamsMetricsImpl streamsMetrics,
+ private static StateUpdater maybeCreateAndStartStateUpdater(final
StreamsMetricsImpl streamsMetrics,
Review Comment:
Since this method now always creates and starts a state updater, could you
please rename it to `createAndStartStateUpdater()`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Consumer<byte[], byte[]> restoreConsumer =
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
- time,
- config,
- restorationLogContext,
- adminClient,
- restoreConsumer,
- userStateRestoreListener,
- userStandbyUpdateListener
+ time,
+ config,
+ restorationLogContext,
+ adminClient,
+ restoreConsumer,
+ userStateRestoreListener,
+ userStandbyUpdateListener
);
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes,
streamsMetrics);
final boolean proceessingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- cache,
- time,
- clientSupplier,
- threadId,
- threadIdx,
- processId,
- log,
- stateUpdaterEnabled,
- proceessingThreadsEnabled
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ cache,
+ time,
+ clientSupplier,
+ threadId,
+ threadIdx,
+ processId,
+ log,
+ proceessingThreadsEnabled
);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- threadId,
- log,
- stateUpdaterEnabled);
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ threadId,
+ log);
final Tasks tasks = new Tasks(new LogContext(logPrefix));
final boolean processingThreadsEnabled =
- InternalConfig.processingThreadsEnabled(config.originals());
+ InternalConfig.processingThreadsEnabled(config.originals());
final DefaultTaskManager schedulingTaskManager =
- maybeCreateSchedulingTaskManager(processingThreadsEnabled,
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
+ maybeCreateSchedulingTaskManager(processingThreadsEnabled,
topologyMetadata, time, threadId, tasks);
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Consumer<byte[], byte[]> restoreConsumer =
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
- time,
- config,
- restorationLogContext,
- adminClient,
- restoreConsumer,
- userStateRestoreListener,
- userStandbyUpdateListener
+ time,
+ config,
+ restorationLogContext,
+ adminClient,
+ restoreConsumer,
+ userStateRestoreListener,
+ userStandbyUpdateListener
);
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes,
streamsMetrics);
final boolean proceessingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- cache,
- time,
- clientSupplier,
- threadId,
- threadIdx,
- processId,
- log,
- stateUpdaterEnabled,
- proceessingThreadsEnabled
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ cache,
+ time,
+ clientSupplier,
+ threadId,
+ threadIdx,
+ processId,
+ log,
+ proceessingThreadsEnabled
);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
- topologyMetadata,
- config,
- streamsMetrics,
- stateDirectory,
- changelogReader,
- threadId,
- log,
- stateUpdaterEnabled);
+ topologyMetadata,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ changelogReader,
+ threadId,
+ log);
final Tasks tasks = new Tasks(new LogContext(logPrefix));
final boolean processingThreadsEnabled =
- InternalConfig.processingThreadsEnabled(config.originals());
+ InternalConfig.processingThreadsEnabled(config.originals());
final DefaultTaskManager schedulingTaskManager =
- maybeCreateSchedulingTaskManager(processingThreadsEnabled,
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
+ maybeCreateSchedulingTaskManager(processingThreadsEnabled,
topologyMetadata, time, threadId, tasks);
final StateUpdater stateUpdater =
- maybeCreateAndStartStateUpdater(
- stateUpdaterEnabled,
- streamsMetrics,
- config,
- restoreConsumer,
- changelogReader,
- topologyMetadata,
- time,
- clientId,
- threadIdx
- );
+ maybeCreateAndStartStateUpdater(
+ streamsMetrics,
+ config,
+ restoreConsumer,
+ changelogReader,
+ topologyMetadata,
+ time,
+ clientId,
+ threadIdx
+ );
final TaskManager taskManager = new TaskManager(
- time,
- changelogReader,
- new ProcessId(processId),
- logPrefix,
- activeTaskCreator,
- standbyTaskCreator,
- tasks,
- topologyMetadata,
- adminClient,
- stateDirectory,
- stateUpdater,
- schedulingTaskManager
+ time,
+ changelogReader,
+ new ProcessId(processId),
+ logPrefix,
+ activeTaskCreator,
+ standbyTaskCreator,
+ tasks,
+ topologyMetadata,
+ adminClient,
+ stateDirectory,
+ stateUpdater,
+ schedulingTaskManager
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -615,27 +601,27 @@ public StreamThread(final Time time,
ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
ThreadMetrics.addThreadStartTimeMetric(
- threadId,
- streamsMetrics,
- time.milliseconds()
+ threadId,
+ streamsMetrics,
+ time.milliseconds()
);
ThreadMetrics.addThreadStateTelemetryMetric(
- processId.toString(),
- threadId,
- streamsMetrics,
- (metricConfig, now) -> this.state().ordinal());
+ processId.toString(),
+ threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state().ordinal());
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -615,27 +601,27 @@ public StreamThread(final Time time,
ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
ThreadMetrics.addThreadStartTimeMetric(
- threadId,
- streamsMetrics,
- time.milliseconds()
+ threadId,
+ streamsMetrics,
+ time.milliseconds()
);
ThreadMetrics.addThreadStateTelemetryMetric(
- processId.toString(),
- threadId,
- streamsMetrics,
- (metricConfig, now) -> this.state().ordinal());
+ processId.toString(),
+ threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state().ordinal());
ThreadMetrics.addThreadStateMetric(
- threadId,
- streamsMetrics,
- (metricConfig, now) -> this.state());
+ threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state());
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -760,12 +745,12 @@ boolean runLoop() {
} catch (final UnsupportedVersionException e) {
final String errorMessage = e.getMessage();
if (errorMessage != null &&
- errorMessage.startsWith("Broker unexpectedly doesn't
support requireStable flag on version ")) {
+ errorMessage.startsWith("Broker unexpectedly doesn't
support requireStable flag on version ")) {
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
streamsMetrics.metricsRegistry().addReporter(reporter);
final StreamThread streamThread = new StreamThread(
- time,
- config,
- adminClient,
- mainConsumer,
- restoreConsumer,
- changelogReader,
- originalReset,
- taskManager,
- stateUpdater,
- streamsMetrics,
- topologyMetadata,
- processId,
- threadId,
- logContext,
- referenceContainer.assignmentErrorCode,
- referenceContainer.nextScheduledRebalanceMs,
- referenceContainer.nonFatalExceptionsToHandle,
- shutdownErrorHook,
- streamsUncaughtExceptionHandler,
- cache::resize
+ time,
+ config,
+ adminClient,
+ mainConsumer,
+ restoreConsumer,
+ changelogReader,
+ originalReset,
+ taskManager,
+ stateUpdater,
+ streamsMetrics,
+ topologyMetadata,
+ processId,
+ threadId,
+ logContext,
+ referenceContainer.assignmentErrorCode,
+ referenceContainer.nextScheduledRebalanceMs,
+ referenceContainer.nonFatalExceptionsToHandle,
+ shutdownErrorHook,
+ streamsUncaughtExceptionHandler,
+ cache::resize
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -615,27 +601,27 @@ public StreamThread(final Time time,
ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
ThreadMetrics.addThreadStartTimeMetric(
- threadId,
- streamsMetrics,
- time.milliseconds()
+ threadId,
+ streamsMetrics,
+ time.milliseconds()
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -801,13 +786,13 @@ void maybeGetClientInstanceIds() {
}
} else {
mainConsumerInstanceIdFuture.completeExceptionally(
- new TimeoutException("Could not retrieve main consumer
client instance id.")
+ new TimeoutException("Could not retrieve main
consumer client instance id.")
);
}
}
- if (!stateUpdaterEnabled &&
!restoreConsumerInstanceIdFuture.isDone()) {
+ if (!restoreConsumerInstanceIdFuture.isDone()) {
Review Comment:
Shouldn't you delete the whole `if`-clause?
Since the state updater will be always enabled after this PR, this block
should always be skipped.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -657,13 +643,12 @@ public StreamThread(final Time time,
this.pollTime =
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
final int dummyThreadIdx = 1;
this.maxPollTimeMs = new
InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId",
"dummyClientId", dummyThreadIdx))
- .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+ .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -831,7 +816,7 @@ void maybeGetClientInstanceIds() {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
producerInstanceIdFuture.complete(
-
taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
+
taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -760,12 +745,12 @@ boolean runLoop() {
} catch (final UnsupportedVersionException e) {
final String errorMessage = e.getMessage();
if (errorMessage != null &&
- errorMessage.startsWith("Broker unexpectedly doesn't
support requireStable flag on version ")) {
+ errorMessage.startsWith("Broker unexpectedly doesn't
support requireStable flag on version ")) {
log.error("Shutting down because the Kafka cluster seems
to be on a too old version. " +
- "Setting {}=\"{}\" requires broker version 2.5
or higher.",
- StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
- StreamsConfig.EXACTLY_ONCE_V2);
+ "Setting {}=\"{}\" requires broker version
2.5 or higher.",
+ StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
+ StreamsConfig.EXACTLY_ONCE_V2);
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -822,7 +807,7 @@ void maybeGetClientInstanceIds() {
}
} else {
restoreConsumerInstanceIdFuture.completeExceptionally(
- new TimeoutException("Could not retrieve restore
consumer client instance id.")
+ new TimeoutException("Could not retrieve restore
consumer client instance id.")
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -801,13 +786,13 @@ void maybeGetClientInstanceIds() {
}
} else {
mainConsumerInstanceIdFuture.completeExceptionally(
- new TimeoutException("Could not retrieve main consumer
client instance id.")
+ new TimeoutException("Could not retrieve main
consumer client instance id.")
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -918,8 +903,8 @@ public void sendShutdownRequest(final AssignorError
assignorError) {
private void handleTaskMigrated(final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
- "This implies that this thread missed a rebalance and
dropped out of the consumer group. " +
- "Will close out all assigned tasks and rejoin the
consumer group.", e);
+ "This implies that this thread missed a rebalance and dropped
out of the consumer group. " +
+ "Will close out all assigned tasks and rejoin the consumer
group.", e);
Review Comment:
Could you please revert the additional indentation?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() {
}
} else {
producerInstanceIdFuture.completeExceptionally(
- new TimeoutException("Could not retrieve thread
producer client instance id.")
+ new TimeoutException("Could not retrieve thread
producer client instance id.")
);
}
}
if (mainConsumerInstanceIdFuture.isDone()
- && (!stateUpdaterEnabled &&
restoreConsumerInstanceIdFuture.isDone())
- && producerInstanceIdFuture.isDone()) {
+ && restoreConsumerInstanceIdFuture.isDone()
Review Comment:
I believe you can also remove this line. With the state updater fetching
this instance ID is done within the state updater.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]