cadonna commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923282700


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -267,8 +276,31 @@ private void removeTask(final TaskId taskId) {
                 transitToUpdateStandbysIfOnlyStandbysLeft();
                 log.debug((task.isActive() ? "Active" : "Standby")
                     + " task " + task.id() + " was removed from the updating 
tasks and added to the removed tasks.");
+            } else if (pausedTasks.containsKey(taskId)) {
+                task = pausedTasks.get(taskId);
+                final Collection<TopicPartition> changelogPartitions = 
task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                pausedTasks.remove(taskId);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the paused 
tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not 
updating or paused.");
+            }
+        }
+
+        private void pauseTask(final TaskId taskId) {
+            final Task task = updatingTasks.get(taskId);
+            if (task != null) {
+                // do not need to unregister changelog partitions for paused 
tasks

Review Comment:
   Do we really need this comment?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java:
##########
@@ -47,6 +48,11 @@ public static TaskAndAction createRemoveTask(final TaskId 
taskId) {
         return new TaskAndAction(null, taskId, Action.REMOVE);
     }
 
+    public static TaskAndAction createPauseTask(final TaskId taskId) {
+        Objects.requireNonNull(taskId, "Task ID of task to pause is null!");
+        return new TaskAndAction(null, taskId, Action.PAUSE);
+    }
+

Review Comment:
   Could you please add a test in `TaskAndActionTest` for this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java:
##########
@@ -55,7 +61,7 @@ public Task getTask() {
     }
 
     public TaskId getTaskId() {
-        if (action != Action.REMOVE) {
+        if (action != Action.REMOVE && action != Action.PAUSE) {

Review Comment:
   Could you please add a test in `TaskAndActionTest` for this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -94,6 +94,19 @@ public int hashCode() {
      */
     void remove(final TaskId taskId);
 
+    /**
+     * Pause a task (active or standby) from restoring in the state updater.
+     *
+     * This method does not block until the task is paused.
+     *
+     * The task to be paused is not removed from the restored active tasks and 
the failed tasks.

Review Comment:
   Could you also add that removed tasks are not paused?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -498,6 +575,66 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task 
task) throws Exceptio
         verifyRestoredActiveTasks();
     }
 
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {

Review Comment:
   `shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks` -> 
`shouldNotPauseActiveStatefulTaskInRestoredActiveTasks`
   
   The same applies to below test methods.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -143,6 +142,27 @@ public void shouldThrowIfStandbyTaskNotInStateRunning() {
         shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
     }
 
+    @Test
+    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
+        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws 
Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+

Review Comment:
   Just for completeness, could you also add the case where a standby task is 
added and then an active task with the same ID is added?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -267,8 +276,31 @@ private void removeTask(final TaskId taskId) {
                 transitToUpdateStandbysIfOnlyStandbysLeft();
                 log.debug((task.isActive() ? "Active" : "Standby")
                     + " task " + task.id() + " was removed from the updating 
tasks and added to the removed tasks.");
+            } else if (pausedTasks.containsKey(taskId)) {
+                task = pausedTasks.get(taskId);
+                final Collection<TopicPartition> changelogPartitions = 
task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                pausedTasks.remove(taskId);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the paused 
tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not 
updating or paused.");
+            }

Review Comment:
   I could not find a test that verifies this branch. Could you please add one 
if I did not miss it?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -983,6 +1120,27 @@ private void verifyRemovedTasks(final Task... tasks) 
throws Exception {
         }
     }
 
+    private void verifyPausedTasks(final Task... tasks) throws Exception {
+        if (tasks.length == 0) {
+            assertTrue(stateUpdater.getPausedTasks().isEmpty());
+        } else {
+            final Set<Task> expectedPausedTasks = mkSet(tasks);
+            final Set<Task> pausedTasks = new HashSet<>();
+            waitForCondition(
+                    () -> {
+                        pausedTasks.addAll(stateUpdater.getPausedTasks());
+                        return pausedTasks.containsAll(expectedPausedTasks)
+                                && pausedTasks.size() == 
expectedPausedTasks.size();
+                    },
+                    VERIFICATION_TIMEOUT,
+                    "Did not get all paused task within the given timeout!"
+            );
+            assertTrue(pausedTasks.stream()
+                .allMatch(task -> task.isActive() && task.state() == 
State.RESTORING
+                    || !task.isActive() && task.state() == State.RUNNING));

Review Comment:
   This verification does not contribute to testing correctness since tasks are 
mocks for which the test code specifies what they should return. I removed this 
verifications in the other verification methods in one of my last PRs. Could 
you please also remove this verification here?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1012,6 +1170,22 @@ private void verifyExceptionsAndFailedTasks(final 
ExceptionAndTasks... exception
         );
     }
 
+    private void verifyFailedTasks(final Task... tasks) throws Exception {
+        final List<Task> expectedFailedTasks = Arrays.asList(tasks);
+        final Set<Task> failedTasks = new HashSet<>();
+        waitForCondition(
+                () -> {
+                    for (final ExceptionAndTasks exceptionsAndTasks : 
stateUpdater.getExceptionsAndFailedTasks()) {
+                        failedTasks.addAll(exceptionsAndTasks.getTasks());
+                    }
+                    return failedTasks.containsAll(expectedFailedTasks)
+                            && failedTasks.size() == 
expectedFailedTasks.size();
+                },
+                VERIFICATION_TIMEOUT,
+                "Did not get all exceptions and failed tasks within the given 
timeout!"
+        );
+    }
+

Review Comment:
   Why do you introduce this verification method? For testing exceptions there 
is already `verifyExceptionsAndFailedTasks()`. You should anyways also verify 
the thrown exception and not only the failed tasks because otherwise any 
exception will satisfy this verification not only the expected one.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -143,6 +142,27 @@ public void shouldThrowIfStandbyTaskNotInStateRunning() {
         shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
     }
 
+    @Test
+    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
+        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws 
Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
     private void shouldThrowIfTaskNotInGivenState(final Task task, final State 
correctState) {

Review Comment:
   nit: Could you please move this method to below 
`shouldThrowIfStandbyTaskNotInStateRunning()`? In this way methods that a 
related to each other are clustered.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -428,6 +456,55 @@ private void shouldRemoveStatefulTask(final Task task) 
throws Exception {
         verify(changelogReader).unregister(task.changelogPartitions());
     }
 
+    @Test
+    public void shouldPauseActiveStatefulTask() throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldPauseStatefulTask(task);
+        verify(changelogReader, never()).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldPauseStandbyTask() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldPauseStatefulTask(task);
+        verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws 
Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+
+        stateUpdater.start();
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+
+        stateUpdater.pause(task1.id());
+
+        verifyPausedTasks(task1);
+        verifyCheckpointTasks(true, task1);
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks(task2);
+        verifyExceptionsAndFailedTasks();
+        verify(changelogReader, times(1)).enforceRestoreActive();
+        verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    private void shouldPauseStatefulTask(final Task task) throws Exception {
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        stateUpdater.pause(task.id());
+
+        verifyPausedTasks(task);
+        verifyCheckpointTasks(true, task);
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {

Review Comment:
   nit: Could you please move all `shouldNotRemove...` tests before 
`shouldPauseActiveStatefulTask()` so that all tests related to remove are 
together and all tests related to pause are together?  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -94,6 +94,19 @@ public int hashCode() {
      */
     void remove(final TaskId taskId);
 
+    /**
+     * Pause a task (active or standby) from restoring in the state updater.
+     *
+     * This method does not block until the task is paused.
+     *
+     * The task to be paused is not removed from the restored active tasks and 
the failed tasks.

Review Comment:
   Shouldn't this be:
   ```
   Restored tasks and failed tasks are not paused.
   ```
   
   or 
   
   ```
   Tasks in restored active tasks and failed tasks are not paused.
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -983,6 +1120,27 @@ private void verifyRemovedTasks(final Task... tasks) 
throws Exception {
         }
     }
 
+    private void verifyPausedTasks(final Task... tasks) throws Exception {

Review Comment:
   We also need to verify the paused tasks in the existing test methods. For 
instance, `shouldImmediatelyAddStatelessTasksToRestoredTasks()` should include 
the call `verifyPausedTasks()` to verify that a stateless task is not added to 
the paused tasks. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -498,6 +575,66 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task 
task) throws Exceptio
         verifyRestoredActiveTasks();
     }
 
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.pause(task.id());
+        stateUpdater.pause(controlTask.id());
+
+        verifyPausedTasks(controlTask);
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromFailedTasks() throws 
Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskFromFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotPauseStandbyTaskFromFailedTasks() throws Exception {

Review Comment:
   Could you please add tests 
`shouldNotPauseActiveStatefulTaskInRemovedTasks()` and 
`shouldNotPauseStandbyTaskInRemovedTasks()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to