cadonna commented on code in PR #15117: URL: https://github.com/apache/kafka/pull/15117#discussion_r1441615070
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1164,9 +1243,12 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); + final StreamTask taskToCloseReviveAndUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId04Partitions).build(); Review Comment: nit: ```suggestion .inState(State.RESTORING) .withInputPartitions(taskId04Partitions).build(); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1127,6 +1175,36 @@ public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { Mockito.verify(stateUpdater).add(task01); } + @Test + public void shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) Review Comment: nit: Could you please use more meaningful names for those task variables? Something like `activeTask` and `standbyTask` would already be better. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() { stateUpdater.add(task); } else if (tasks.removePendingTaskToCloseClean(task.id())) { closeTaskClean(task, tasksToCloseDirty, taskExceptions); + } else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { + if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { + task.revive(); + task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); + stateUpdater.add(task); Review Comment: Where is the task initialized? The task is closed cleanly and the call to `revive()` sets the task to `CREATED`. However, the task is never initialized after that. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final long now, closeTaskClean(task, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToAddBack(task.id())) { stateUpdater.add(task); + } else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { + if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { + task.revive(); + task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); Review Comment: My comment above also applies here. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1127,6 +1175,36 @@ public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { Mockito.verify(stateUpdater).add(task01); } + @Test + public void shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RESTORING).build(); + final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING).build(); + when(stateUpdater.hasRemovedTasks()).thenReturn(true); + when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions); Review Comment: I thought a standby task can never be added to this pending action. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org