cadonna commented on code in PR #12279: URL: https://github.com/apache/kafka/pull/12279#discussion_r901395029
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -85,7 +86,7 @@ public Collection<StandbyTask> getUpdatingStandbyTasks() { } public boolean onlyStandbyTasksLeft() { - return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive()); + return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive); Review Comment: I did the same change in https://github.com/apache/kafka/pull/12312 🙂 I will revert the change in my PR to avoid merge conflicts. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception { verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4); } + @Test + public void shouldAutoCommitTasksOnInterval() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + sleep(COMMIT_INTERVAL); + + verifyExceptionsAndFailedTasks(); + verifyCheckpointTasks(false, task1, task2, task3, task4); + } + + @Test + public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); + final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); + + try { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + verifyNeverCheckpointTasks(task1, task2, task3, task4); + } finally { + stateUpdater.shutdown(Duration.ofMinutes(1)); + } + } + + private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) throws Exception { Review Comment: ```suggestion private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) { ``` ########## streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java: ########## @@ -236,6 +236,16 @@ public static String safeUniqueTestName(final Class<?> testClass, final TestName .replace('=', '_'); } + public static String safeUniqueClassTestName(final Class<?> testClass) { + return (testClass.getSimpleName()) + .replace(':', '_') + .replace('.', '_') + .replace('[', '_') + .replace(']', '_') + .replace(' ', '_') + .replace('=', '_'); + } + Review Comment: This should be dead code now that we do not need this method in `DefaultStateUpdater`. Could you please remove it? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -290,6 +295,23 @@ private void addTaskToRestoredTasks(final StreamTask task) { restoredActiveTasksLock.unlock(); } } + + private void maybeCommitRestoringTasks(final long now) { Review Comment: Could you please rename to `maybeCheckpointUpdatingTasks()`? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -346,42 +369,48 @@ private void shouldRemoveStatefulTask(final Task task) throws Exception { .thenReturn(false); stateUpdater.add(task); - stateUpdater.remove(TASK_0_0); + stateUpdater.remove(task.id()); Review Comment: Thank you! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception { verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4); } + @Test + public void shouldAutoCommitTasksOnInterval() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + sleep(COMMIT_INTERVAL); + + verifyExceptionsAndFailedTasks(); + verifyCheckpointTasks(false, task1, task2, task3, task4); + } + + @Test + public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); + final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); + + try { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + verifyNeverCheckpointTasks(task1, task2, task3, task4); + } finally { + stateUpdater.shutdown(Duration.ofMinutes(1)); + } + } + + private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) throws Exception { + for (final Task task : tasks) { + verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint); + } + } + + private void verifyNeverCheckpointTasks(final Task... tasks) throws Exception { Review Comment: ```suggestion private void verifyNeverCheckpointTasks(final Task... tasks) { ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -470,6 +500,7 @@ public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutT verifyRemovedTasks(); verifyUpdatingTasks(); verifyRestoredActiveTasks(); + verifyNeverCheckpointTasks(task1, task2); Review Comment: That might lead to flakiness, because `task1` might checkpoint before `task2` is added and the exception is thrown. If I run only this test, it indeed fails for me. Setting the commit interval to `Integer.MAX_VALUE` with ``` stateUpdater.shutdown(Duration.ofMinutes(1)); final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); ``` would fix it, I guess. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception { verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4); } + @Test + public void shouldAutoCommitTasksOnInterval() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + sleep(COMMIT_INTERVAL); + + verifyExceptionsAndFailedTasks(); + verifyCheckpointTasks(false, task1, task2, task3, task4); + } + + @Test + public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); + final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); + + try { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + verifyNeverCheckpointTasks(task1, task2, task3, task4); + } finally { + stateUpdater.shutdown(Duration.ofMinutes(1)); + } + } + + private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) throws Exception { + for (final Task task : tasks) { + verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint); Review Comment: nit: I think the timeout is not needed here, since you use this verification after the tasks are either restored or removed. In those cases `maybeCheckpoint()` should have already be called by then. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -531,6 +563,7 @@ public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() verifyUpdatingTasks(task3); verifyRestoredActiveTasks(); verifyRemovedTasks(); + verifyNeverCheckpointTasks(task1, task2); Review Comment: Same flakiness as above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -418,6 +447,7 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exceptio stateUpdater.remove(controlTask.id()); verifyRemovedTasks(controlTask); + verifyCheckpointTasks(true, controlTask); Review Comment: Also here I think we do not need this verification. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -173,6 +192,7 @@ public void shouldRestoreSingleActiveStatefulTask() throws Exception { stateUpdater.add(task); verifyRestoredActiveTasks(task); + verifyCheckpointTasks(true, task); Review Comment: Putting it here since I cannot comment on existing code: Could you add ` verifyNeverCheckpointTasks(tasks);` to `shouldImmediatelyAddStatelessTasksToRestoredTasks()` to verify that a stateless task is not checkpointed after restoration? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -290,6 +295,34 @@ private void addTaskToRestoredTasks(final StreamTask task) { restoredActiveTasksLock.unlock(); } } + + private void maybeCommitRestoringTasks(final long now) { + final long elapsedMsSinceLastCommit = now - lastCommitMs; + if (elapsedMsSinceLastCommit > commitIntervalMs) { + if (log.isDebugEnabled()) { + log.debug("Committing all restoring tasks since {}ms has elapsed (commit interval is {}ms)", + elapsedMsSinceLastCommit, commitIntervalMs); + } + + for (final Task task : updatingTasks.values()) { + // do not enforce checkpointing during restoration if its position has not advanced much Review Comment: Looking forward to the follow-up PR 🙂 ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception { verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4); } + @Test + public void shouldAutoCommitTasksOnInterval() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + sleep(COMMIT_INTERVAL); + + verifyExceptionsAndFailedTasks(); + verifyCheckpointTasks(false, task1, task2, task3, task4); + } + + @Test + public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); + final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); + + try { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + verifyNeverCheckpointTasks(task1, task2, task3, task4); + } finally { + stateUpdater.shutdown(Duration.ofMinutes(1)); + } + } + + private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) throws Exception { + for (final Task task : tasks) { + verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint); + } + } + + private void verifyNeverCheckpointTasks(final Task... tasks) throws Exception { + for (final Task task : tasks) { + verify(task, never()).maybeCheckpoint(true); + verify(task, never()).maybeCheckpoint(false); Review Comment: ```suggestion verify(task, never()).maybeCheckpoint(anyBoolean()); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception { verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4); } + @Test + public void shouldAutoCommitTasksOnInterval() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + sleep(COMMIT_INTERVAL); + + verifyExceptionsAndFailedTasks(); + verifyCheckpointTasks(false, task1, task2, task3, task4); + } + + @Test + public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); + final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); Review Comment: It is better to do this, otherwise verifications like `verifyExceptionsAndFailedTasks()` will not work. You do not use them in this method, but the change make the test future-proof. ```suggestion stateUpdater.shutdown(Duration.ofMinutes(1)); final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java: ########## @@ -88,7 +88,8 @@ public abstract class AbstractTask implements Task { * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed * or flushing state store get IO errors; such error should cause the thread to die */ - protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { + @Override + public void maybeCheckpoint(final boolean enforceCheckpoint) { Review Comment: Now that this method is public, could you please add unit tests for this method? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -346,42 +369,48 @@ private void shouldRemoveStatefulTask(final Task task) throws Exception { .thenReturn(false); stateUpdater.add(task); - stateUpdater.remove(TASK_0_0); + stateUpdater.remove(task.id()); verifyRemovedTasks(task); + verifyCheckpointTasks(true, task); verifyRestoredActiveTasks(); verifyUpdatingTasks(); verifyExceptionsAndFailedTasks(); - verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0)); + verify(changelogReader).unregister(task.changelogPartitions()); } @Test public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception { final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - shouldNotRemoveTaskFromRestoredActiveTasks(task); + shouldNotRemoveTaskFromRestoredActiveTasks(task, Collections.singleton(TOPIC_PARTITION_A_0)); } @Test public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception { final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0); - shouldNotRemoveTaskFromRestoredActiveTasks(task); + shouldNotRemoveTaskFromRestoredActiveTasks(task, Collections.emptySet()); } - private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception { + private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task, final Set<TopicPartition> completedChangelogs) throws Exception { final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()) - .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); + .thenReturn(completedChangelogs); when(changelogReader.allChangelogsCompleted()) .thenReturn(false); stateUpdater.add(task); stateUpdater.add(controlTask); verifyRestoredActiveTasks(task); + // for stateless task, we should complete it without trying to commit + if (!completedChangelogs.isEmpty()) + verifyCheckpointTasks(true, task); + stateUpdater.remove(task.id()); stateUpdater.remove(controlTask.id()); verifyRemovedTasks(controlTask); verifyRestoredActiveTasks(task); + verifyCheckpointTasks(true, controlTask); Review Comment: I do not think we need to verify checkpointing here and above since in this test we are only concerned about the tasks not being removed from the restored output queue. That simplifies the tests since you can remove the other related changes to `shouldNotRemoveStatelessTaskFromRestoredActiveTasks()` and `shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -552,6 +585,7 @@ public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Ex verifyUpdatingTasks(); verifyRestoredActiveTasks(); verifyRemovedTasks(); + verifyNeverCheckpointTasks(task1, task2); Review Comment: Same flakiness as above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -69,15 +77,27 @@ class DefaultStateUpdaterTest { private final static TaskId TASK_1_0 = new TaskId(1, 0); private final static TaskId TASK_1_1 = new TaskId(1, 1); + private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { }; - private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(changelogReader, offsetResetter, Time.SYSTEM); + private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); @AfterEach public void tearDown() { stateUpdater.shutdown(Duration.ofMinutes(1)); } + private Properties configProps(final int commitInterval) { + return mkObjectProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueClassTestName(getClass())), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval), + // we need to make sure that transaction timeout is not lower than commit interval for EOS Review Comment: If there is a check in the production code, we do not need to add this comment since the test would fail anyways, right? I would remove the comment. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -506,6 +537,7 @@ public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask verifyUpdatingTasks(task2); verifyRestoredActiveTasks(); verifyRemovedTasks(); + verifyNeverCheckpointTasks(task1, task3); Review Comment: Same flakiness as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org