guozhangwang commented on code in PR #12200:
URL: https://github.com/apache/kafka/pull/12200#discussion_r880841005


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -206,30 +230,46 @@ private List<TaskAndAction> getTasksAndActions() {
 
         private void addTask(final Task task) {
             if (isStateless(task)) {
+                log.debug("Stateless active task " + task.id() + " was added 
to the state updater");
                 addTaskToRestoredTasks((StreamTask) task);
             } else {
-                updatingTasks.put(task.id(), task);
+                if (task.isActive()) {
+                    updatingTasks.put(task.id(), task);

Review Comment:
   nit: we can move this line out of the if-else block.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -74,30 +76,44 @@ public Collection<Task> getAllUpdatingTasks() {
             return updatingTasks.values();
         }
 
+        public Collection<StandbyTask> getUpdatingStandbyTasks() {
+            return updatingTasks.values().stream()
+                .filter(t -> !t.isActive())
+                .map(t -> (StandbyTask) t)
+                .collect(Collectors.toList());
+        }
+
+        public boolean onlyStandbyTasksLeft() {
+            return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().allMatch(t -> !t.isActive());

Review Comment:
   I think we do not need the first condition, since even if there's empty 
tasks calling `transitToUpdateStandby` does not harm since the thread would be 
returned immediately from `changelogReader.restore` and then block waiting on 
`waitIfAllChangelogsCompletelyRead` anyways. Maybe we can just rename it to 
`noActiveTaskLeft` which only checks the second condition.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -356,6 +433,36 @@ public void 
shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Ex
         assertTrue(stateUpdater.getAllTasks().isEmpty());
     }
 
+    private void verifyRestoredActiveTasks(final StreamTask... tasks) throws 
Exception {
+        final Set<StreamTask> expectedRestoredTasks = mkSet(tasks);
+        final Set<StreamTask> restoredTasks = new HashSet<>();
+        waitForCondition(
+            () -> {
+                
restoredTasks.addAll(stateUpdater.getRestoredActiveTasks(Duration.ofMillis(CALL_TIMEOUT)));
+                return restoredTasks.size() == expectedRestoredTasks.size();
+            },
+            VERIFICATION_TIMEOUT,
+            "Did not get any restored active task within the given timeout!"

Review Comment:
   nit: `any` -> `all`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -163,21 +172,12 @@ public void shouldRestoreSingleActiveStatefulTask() 
throws Exception {
 
         stateUpdater.add(task);
 
-        final Set<StreamTask> expectedRestoredTasks = 
Collections.singleton(task);
-        final Set<StreamTask> restoredTasks = new HashSet<>();
-        waitForCondition(
-            () -> {
-                
restoredTasks.addAll(stateUpdater.getRestoredActiveTasks(Duration.ofMillis(CALL_TIMEOUT)));
-                return restoredTasks.size() == expectedRestoredTasks.size();
-            },
-            VERIFICATION_TIMEOUT,
-            "Did not get any restored active task within the given timeout!"
-        );
-        assertTrue(restoredTasks.containsAll(expectedRestoredTasks));
-        assertEquals(expectedRestoredTasks.size(), 
restoredTasks.stream().filter(t -> t.state() == State.RESTORING).count());
+        verifyRestoredActiveTasks(task);
         assertTrue(stateUpdater.getAllTasks().isEmpty());
-        verify(changelogReader, atLeast(3)).restore(anyMap());
+        verify(changelogReader, times(1)).enforceRestoreActive();
+        verify(changelogReader, atLeast(1)).restore(anyMap());

Review Comment:
   Why change from 3 to 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to