cadonna commented on code in PR #15117:
URL: https://github.com/apache/kafka/pull/15117#discussion_r1441615070


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1164,9 +1243,12 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, 
taskId03ChangelogPartitions)
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
+        final StreamTask taskToCloseReviveAndUpdateInputPartitions = 
statefulTask(taskId04, taskId04ChangelogPartitions)
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId04Partitions).build();

Review Comment:
   nit:
   ```suggestion
               .inState(State.RESTORING)
               .withInputPartitions(taskId04Partitions).build();
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1127,6 +1175,36 @@ public void 
shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void 
shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)

Review Comment:
   nit:
   Could you please use more meaningful names for those task variables? 
Something like `activeTask` and `standbyTask` would already be better.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() {
                 stateUpdater.add(task);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
+            } else if ((inputPartitions = 
tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != 
null) {
+                if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
+                    task.revive();
+                    task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+                    stateUpdater.add(task);

Review Comment:
   Where is the task initialized?
   The task is closed cleanly and the call to `revive()` sets the task to 
`CREATED`. However, the task is never initialized after that.
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final 
long now,
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToAddBack(task.id())) {
                 stateUpdater.add(task);
+            } else if ((inputPartitions = 
tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != 
null) {
+                if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
+                    task.revive();
+                    task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));

Review Comment:
   My comment above also applies here.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1127,6 +1175,36 @@ public void 
shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void 
shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        when(stateUpdater.hasRemovedTasks()).thenReturn(true);
+        when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, 
task01));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
+        
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions);
+        
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions);

Review Comment:
   I thought a standby task can never be added to this pending action.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to