cadonna commented on code in PR #12279:
URL: https://github.com/apache/kafka/pull/12279#discussion_r901395029


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -85,7 +86,7 @@ public Collection<StandbyTask> getUpdatingStandbyTasks() {
         }
 
         public boolean onlyStandbyTasksLeft() {
-            return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().allMatch(t -> !t.isActive());
+            return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().noneMatch(Task::isActive);

Review Comment:
   I did the same change in https://github.com/apache/kafka/pull/12312 🙂 
   I will revert the change in my PR to avoid merge conflicts.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCommitTasksOnInterval() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCheckpointTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws 
Exception {
+        final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
+        final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+        try {
+            final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+            final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+            final StandbyTask task3 = 
createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+            final StandbyTask task4 = 
createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+            when(changelogReader.completedChangelogs())
+                    .thenReturn(Collections.emptySet());
+            when(changelogReader.allChangelogsCompleted())
+                    .thenReturn(false);
+
+            stateUpdater.add(task1);
+            stateUpdater.add(task2);
+            stateUpdater.add(task3);
+            stateUpdater.add(task4);
+
+            verifyNeverCheckpointTasks(task1, task2, task3, task4);
+        } finally {
+            stateUpdater.shutdown(Duration.ofMinutes(1));
+        }
+    }
+
+    private void verifyCheckpointTasks(final boolean enforceCheckpoint, final 
Task... tasks) throws Exception {

Review Comment:
   ```suggestion
       private void verifyCheckpointTasks(final boolean enforceCheckpoint, 
final Task... tasks) {
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java:
##########
@@ -236,6 +236,16 @@ public static String safeUniqueTestName(final Class<?> 
testClass, final TestName
                 .replace('=', '_');
     }
 
+    public static String safeUniqueClassTestName(final Class<?> testClass) {
+        return (testClass.getSimpleName())
+                .replace(':', '_')
+                .replace('.', '_')
+                .replace('[', '_')
+                .replace(']', '_')
+                .replace(' ', '_')
+                .replace('=', '_');
+    }
+

Review Comment:
   This should be dead code now that we do not need this method in 
`DefaultStateUpdater`. Could you please remove it?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -290,6 +295,23 @@ private void addTaskToRestoredTasks(final StreamTask task) 
{
                 restoredActiveTasksLock.unlock();
             }
         }
+
+        private void maybeCommitRestoringTasks(final long now) {

Review Comment:
   Could you please rename to `maybeCheckpointUpdatingTasks()`? 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -346,42 +369,48 @@ private void shouldRemoveStatefulTask(final Task task) 
throws Exception {
             .thenReturn(false);
         stateUpdater.add(task);
 
-        stateUpdater.remove(TASK_0_0);
+        stateUpdater.remove(task.id());

Review Comment:
   Thank you!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCommitTasksOnInterval() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCheckpointTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws 
Exception {
+        final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
+        final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+        try {
+            final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+            final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+            final StandbyTask task3 = 
createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+            final StandbyTask task4 = 
createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+            when(changelogReader.completedChangelogs())
+                    .thenReturn(Collections.emptySet());
+            when(changelogReader.allChangelogsCompleted())
+                    .thenReturn(false);
+
+            stateUpdater.add(task1);
+            stateUpdater.add(task2);
+            stateUpdater.add(task3);
+            stateUpdater.add(task4);
+
+            verifyNeverCheckpointTasks(task1, task2, task3, task4);
+        } finally {
+            stateUpdater.shutdown(Duration.ofMinutes(1));
+        }
+    }
+
+    private void verifyCheckpointTasks(final boolean enforceCheckpoint, final 
Task... tasks) throws Exception {
+        for (final Task task : tasks) {
+            verify(task, 
timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint);
+        }
+    }
+
+    private void verifyNeverCheckpointTasks(final Task... tasks) throws 
Exception {

Review Comment:
   ```suggestion
       private void verifyNeverCheckpointTasks(final Task... tasks) {
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -470,6 +500,7 @@ public void 
shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutT
         verifyRemovedTasks();
         verifyUpdatingTasks();
         verifyRestoredActiveTasks();
+        verifyNeverCheckpointTasks(task1, task2);

Review Comment:
   That might lead to flakiness, because `task1` might checkpoint before 
`task2` is added and the exception is thrown.
   If I run only this test, it indeed fails for me.
   Setting the commit interval to `Integer.MAX_VALUE` with
   ```
   stateUpdater.shutdown(Duration.ofMinutes(1));
   final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
   stateUpdater = new DefaultStateUpdater(config, changelogReader, 
offsetResetter, Time.SYSTEM);
   ```
    would fix it, I guess.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCommitTasksOnInterval() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCheckpointTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws 
Exception {
+        final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
+        final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+        try {
+            final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+            final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+            final StandbyTask task3 = 
createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+            final StandbyTask task4 = 
createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+            when(changelogReader.completedChangelogs())
+                    .thenReturn(Collections.emptySet());
+            when(changelogReader.allChangelogsCompleted())
+                    .thenReturn(false);
+
+            stateUpdater.add(task1);
+            stateUpdater.add(task2);
+            stateUpdater.add(task3);
+            stateUpdater.add(task4);
+
+            verifyNeverCheckpointTasks(task1, task2, task3, task4);
+        } finally {
+            stateUpdater.shutdown(Duration.ofMinutes(1));
+        }
+    }
+
+    private void verifyCheckpointTasks(final boolean enforceCheckpoint, final 
Task... tasks) throws Exception {
+        for (final Task task : tasks) {
+            verify(task, 
timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint);

Review Comment:
   nit: I think the timeout is not needed here, since you use this verification 
after the tasks are either restored or removed. In those cases 
`maybeCheckpoint()` should have already be called by then. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -531,6 +563,7 @@ public void 
shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException()
         verifyUpdatingTasks(task3);
         verifyRestoredActiveTasks();
         verifyRemovedTasks();
+        verifyNeverCheckpointTasks(task1, task2);

Review Comment:
   Same flakiness as above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -418,6 +447,7 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task 
task) throws Exceptio
         stateUpdater.remove(controlTask.id());
 
         verifyRemovedTasks(controlTask);
+        verifyCheckpointTasks(true, controlTask);

Review Comment:
   Also here I think we do not need this verification.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -173,6 +192,7 @@ public void shouldRestoreSingleActiveStatefulTask() throws 
Exception {
         stateUpdater.add(task);
 
         verifyRestoredActiveTasks(task);
+        verifyCheckpointTasks(true, task);

Review Comment:
   Putting it here since I cannot comment on existing code: Could you add ` 
verifyNeverCheckpointTasks(tasks);` to 
`shouldImmediatelyAddStatelessTasksToRestoredTasks()` to verify that a 
stateless task is not checkpointed after restoration?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -290,6 +295,34 @@ private void addTaskToRestoredTasks(final StreamTask task) 
{
                 restoredActiveTasksLock.unlock();
             }
         }
+
+        private void maybeCommitRestoringTasks(final long now) {
+            final long elapsedMsSinceLastCommit = now - lastCommitMs;
+            if (elapsedMsSinceLastCommit > commitIntervalMs) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Committing all restoring tasks since {}ms has 
elapsed (commit interval is {}ms)",
+                        elapsedMsSinceLastCommit, commitIntervalMs);
+                }
+
+                for (final Task task : updatingTasks.values()) {
+                    // do not enforce checkpointing during restoration if its 
position has not advanced much

Review Comment:
   Looking forward to the follow-up PR 🙂 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCommitTasksOnInterval() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCheckpointTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws 
Exception {
+        final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
+        final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+        try {
+            final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+            final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+            final StandbyTask task3 = 
createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+            final StandbyTask task4 = 
createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+            when(changelogReader.completedChangelogs())
+                    .thenReturn(Collections.emptySet());
+            when(changelogReader.allChangelogsCompleted())
+                    .thenReturn(false);
+
+            stateUpdater.add(task1);
+            stateUpdater.add(task2);
+            stateUpdater.add(task3);
+            stateUpdater.add(task4);
+
+            verifyNeverCheckpointTasks(task1, task2, task3, task4);
+        } finally {
+            stateUpdater.shutdown(Duration.ofMinutes(1));
+        }
+    }
+
+    private void verifyCheckpointTasks(final boolean enforceCheckpoint, final 
Task... tasks) throws Exception {
+        for (final Task task : tasks) {
+            verify(task, 
timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint);
+        }
+    }
+
+    private void verifyNeverCheckpointTasks(final Task... tasks) throws 
Exception {
+        for (final Task task : tasks) {
+            verify(task, never()).maybeCheckpoint(true);
+            verify(task, never()).maybeCheckpoint(false);

Review Comment:
   ```suggestion
               verify(task, never()).maybeCheckpoint(anyBoolean());
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCommitTasksOnInterval() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCheckpointTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws 
Exception {
+        final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
+        final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);

Review Comment:
   It is better to do this, otherwise verifications like 
`verifyExceptionsAndFailedTasks()` will not work. You do not use them in this 
method, but the change make the test future-proof.
   ```suggestion
           stateUpdater.shutdown(Duration.ofMinutes(1));
           final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
           stateUpdater = new DefaultStateUpdater(config, changelogReader, 
offsetResetter, Time.SYSTEM);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java:
##########
@@ -88,7 +88,8 @@ public abstract class AbstractTask implements Task {
      * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
      *                          or flushing state store get IO errors; such 
error should cause the thread to die
      */
-    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+    @Override
+    public void maybeCheckpoint(final boolean enforceCheckpoint) {

Review Comment:
   Now that this method is public, could you please add unit tests for this 
method?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -346,42 +369,48 @@ private void shouldRemoveStatefulTask(final Task task) 
throws Exception {
             .thenReturn(false);
         stateUpdater.add(task);
 
-        stateUpdater.remove(TASK_0_0);
+        stateUpdater.remove(task.id());
 
         verifyRemovedTasks(task);
+        verifyCheckpointTasks(true, task);
         verifyRestoredActiveTasks();
         verifyUpdatingTasks();
         verifyExceptionsAndFailedTasks();
-        
verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0));
+        verify(changelogReader).unregister(task.changelogPartitions());
     }
 
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {
         final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
-        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+        shouldNotRemoveTaskFromRestoredActiveTasks(task, 
Collections.singleton(TOPIC_PARTITION_A_0));
     }
 
     @Test
     public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws 
Exception {
         final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
-        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+        shouldNotRemoveTaskFromRestoredActiveTasks(task, 
Collections.emptySet());
     }
 
-    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask 
task) throws Exception {
+    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask 
task, final Set<TopicPartition> completedChangelogs) throws Exception {
         final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
         when(changelogReader.completedChangelogs())
-            .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+            .thenReturn(completedChangelogs);
         when(changelogReader.allChangelogsCompleted())
             .thenReturn(false);
         stateUpdater.add(task);
         stateUpdater.add(controlTask);
         verifyRestoredActiveTasks(task);
 
+        // for stateless task, we should complete it without trying to commit
+        if (!completedChangelogs.isEmpty())
+            verifyCheckpointTasks(true, task);
+
         stateUpdater.remove(task.id());
         stateUpdater.remove(controlTask.id());
 
         verifyRemovedTasks(controlTask);
         verifyRestoredActiveTasks(task);
+        verifyCheckpointTasks(true, controlTask);

Review Comment:
   I do not think we need to verify checkpointing here and above since in this 
test we are only concerned about the tasks not being removed from the restored 
output queue. That simplifies the tests since you can remove the other related 
changes to `shouldNotRemoveStatelessTaskFromRestoredActiveTasks()` and 
`shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks()`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -552,6 +585,7 @@ public void 
shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Ex
         verifyUpdatingTasks();
         verifyRestoredActiveTasks();
         verifyRemovedTasks();
+        verifyNeverCheckpointTasks(task1, task2);

Review Comment:
   Same flakiness as above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -69,15 +77,27 @@ class DefaultStateUpdaterTest {
     private final static TaskId TASK_1_0 = new TaskId(1, 0);
     private final static TaskId TASK_1_1 = new TaskId(1, 1);
 
+    private final StreamsConfig config = new 
StreamsConfig(configProps(COMMIT_INTERVAL));
     private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
     private final java.util.function.Consumer<Set<TopicPartition>> 
offsetResetter = topicPartitions -> { };
-    private final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(changelogReader, offsetResetter, Time.SYSTEM);
+    private final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
 
     @AfterEach
     public void tearDown() {
         stateUpdater.shutdown(Duration.ofMinutes(1));
     }
 
+    private Properties configProps(final int commitInterval) {
+        return mkObjectProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
safeUniqueClassTestName(getClass())),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:2171"),
+                mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
commitInterval),
+                // we need to make sure that transaction timeout is not lower 
than commit interval for EOS

Review Comment:
   If there is a check in the production code, we do not need to add this 
comment since the test would fail anyways, right? I would remove the comment.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -506,6 +537,7 @@ public void 
shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask
         verifyUpdatingTasks(task2);
         verifyRestoredActiveTasks();
         verifyRemovedTasks();
+        verifyNeverCheckpointTasks(task1, task3);

Review Comment:
   Same flakiness as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to