cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2024268591
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -616,7 +598,7 @@ public void flushCache() {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state
stores: {}", stores);
- if (!stateUpdaterEnabled && changelogReader != null) {
+ if (changelogReader != null) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}
Review Comment:
Since `stateUpdaterEnabled` is basically always `true` if we remove the
flag, the `if`-condition should always be `false` and the code guarded by the
`if`-condition should never be executed.
IMO, the changelog reader can be removed from the `ProcessorStateManager`
since registering changelog topics is done in the state updater. Only the old
code path that did not use the state updater needed the changelog reader here.
If the `ProcessorStateManager` does not need the changelog reader, the
active task creator and the standby task creator do also not need the changelog
reader.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean>
streamsUncaughtExceptionHandler) {
- final boolean stateUpdaterEnabled =
InternalConfig.stateUpdaterEnabled(config.originals());
-
final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING,
STATE_UPDATER_ID_SUBSTRING);
- final String restorationThreadId = stateUpdaterEnabled ?
stateUpdaterId : threadId;
+ final String restorationThreadId = stateUpdaterId;
Review Comment:
You could directly use `stateUpdaterId` instead of `restorationThreadId`
since the distinction between state updater ID and thread ID for restoration
does not hold anymore.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -407,30 +406,6 @@ public long position(final TopicPartition partition) {
}
}
- @ParameterizedTest
- @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
- public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType
type) {
- setupStateManagerMock(type);
- setupStoreMetadata();
- setupStore();
- shouldPollWithRightTimeout(true, type);
- }
-
- @ParameterizedTest
- @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
- public void shouldPollWithRightTimeoutWithoutStateUpdater(final
Task.TaskType type) {
- setupStateManagerMock(type);
- setupStoreMetadata();
- setupStore();
- shouldPollWithRightTimeout(false, type);
- }
-
- private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled,
final Task.TaskType type) {
- final Properties properties = new Properties();
- properties.put(InternalConfig.STATE_UPDATER_ENABLED,
stateUpdaterEnabled);
- shouldPollWithRightTimeout(properties, type);
- }
-
@ParameterizedTest
@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
public void shouldPollWithRightTimeoutWithStateUpdaterDefault(final
Task.TaskType type) {
Review Comment:
Could you please include `shouldPollWithRightTimeout()` into this test and
rename this test to `shouldPollWithRightTimeout()`?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1716,15 +1699,14 @@ public void
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
runOnce(processingThreadsEnabled);
// the third actually polls, processes the record, and throws the
corruption exception
- if (stateUpdaterEnabled) {
- TestUtils.waitForCondition(
+ TestUtils.waitForCondition(
() -> thread.taskManager().checkStateUpdater(
mockTime.milliseconds(),
topicPartitions ->
mockConsumer.seekToBeginning(singleton(t1p1))
),
10 * 1000,
"State updater never returned tasks.");
Review Comment:
Please fix the indentation here.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -664,7 +646,7 @@ else if (exception instanceof StreamsException)
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);
- if (!stateUpdaterEnabled && changelogReader != null) {
+ if (changelogReader != null) {
final List<TopicPartition> allChangelogs =
getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}
Review Comment:
See my above comment.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1736,15 +1718,13 @@ public void
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
// Now, we can handle the corruption
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
- if (stateUpdaterEnabled) {
- TestUtils.waitForCondition(
+ TestUtils.waitForCondition(
() -> thread.taskManager().checkStateUpdater(
mockTime.milliseconds(),
topicPartitions ->
mockConsumer.seekToBeginning(singleton(t1p1))
),
10 * 1000,
"State updater never returned tasks.");
- }
Review Comment:
See my comment above about indentation.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2491,26 +2467,17 @@ public Set<TopicPartition> partitions() {
"K2".getBytes(),
"V2".getBytes()));
- if (stateUpdaterEnabled) {
- TestUtils.waitForCondition(
+ TestUtils.waitForCondition(
() -> mockRestoreConsumer.assignment().size() == 0,
"Never get the assignment");
Review Comment:
Fix indentation and could you please also fix the typo? It should be `Never
got the assignment`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1222,15 +1142,15 @@ private long pollPhase() {
final ConsumerRecords<byte[], byte[]> records;
log.debug("Invoking poll on main Consumer");
- if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) {
+ if (state == State.PARTITIONS_ASSIGNED) {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);
Review Comment:
You can get rid of this branch. Polling with duration zero during
`PARTITIONS_ASSIGNED` only applies to the old code path. With the state updater
polling should use the configured poll time as stated on line 1153.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]