This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new bfee3b3c6ba KAFKA-15690: Fix restoring tasks on partition loss, flaky EosIntegrationTest (#14869) bfee3b3c6ba is described below commit bfee3b3c6ba88a0c3392794b4b6d9f0093670f21 Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Fri Dec 1 18:57:27 2023 +0100 KAFKA-15690: Fix restoring tasks on partition loss, flaky EosIntegrationTest (#14869) The following race can happen in the state updater code path Task is restoring, owned by state updater We fall out of the consumer group, lose all partitions We therefore register a "TaskManager.pendingUpdateAction", to CLOSE_DIRTY We also register a "StateUpdater.taskAndAction" to remove the task We get the same task reassigned. Since it's still owned by the state updater, we don't do much The task completes restoration The "StateUpdater.taskAndAction" to remove will be ignored, since it's already restored Inside "handleRestoredTasksFromStateUpdater", we close the task dirty because of the pending update action We now have the task assigned, but it's closed. To fix this particular race, we cancel the "close" pending update action. Furthermore, since we may have made progress in other threads during the missed rebalance, we need to add the task back to the state updater, to at least check if we are still at the end of the changelog. Finally, it seems we do not need to close dirty here, it's enough to close clean when we lose the task, related to KAFKA-10532. This should fix the flaky EOSIntegrationTest. Reviewers: Bruno Cadonna <cado...@apache.org> --- .../processor/internals/PendingUpdateAction.java | 6 +-- .../streams/processor/internals/TaskManager.java | 36 ++++++++-------- .../kafka/streams/processor/internals/Tasks.java | 8 ++-- .../streams/processor/internals/TasksRegistry.java | 4 +- .../processor/internals/TaskManagerTest.java | 48 ++++++++++++++-------- .../streams/processor/internals/TasksTest.java | 36 ++++++++-------- 6 files changed, 77 insertions(+), 61 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java index c51921d8aea..679e84d6d0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java @@ -27,7 +27,7 @@ public class PendingUpdateAction { UPDATE_INPUT_PARTITIONS, RECYCLE, SUSPEND, - CLOSE_DIRTY, + ADD_BACK, CLOSE_CLEAN } @@ -57,8 +57,8 @@ public class PendingUpdateAction { return new PendingUpdateAction(Action.SUSPEND); } - public static PendingUpdateAction createCloseDirty() { - return new PendingUpdateAction(Action.CLOSE_DIRTY); + public static PendingUpdateAction createAddBack() { + return new PendingUpdateAction(Action.ADD_BACK); } public static PendingUpdateAction createCloseClean() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 9197a6b4b98..80ece039838 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -543,8 +543,19 @@ public class TaskManager { final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { final Set<TopicPartition> inputPartitions = activeTasksToCreate.get(taskId); - if (task.isActive()) { - updateInputPartitionsOrRemoveTaskFromTasksToSuspend(task, inputPartitions); + if (task.isActive() && !task.inputPartitions().equals(inputPartitions)) { + stateUpdater.remove(taskId); + tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions); + } else if (task.isActive()) { + tasks.removePendingActiveTaskToSuspend(taskId); + if (tasks.removePendingTaskToCloseClean(taskId)) { + log.info( + "We were planning on closing task {} because we lost one of its partitions." + + "The task got reassigned to this thread, so cancel closing of the task, but add it back to the " + + "state updater, since we may have to catch up on the changelog.", + taskId); + tasks.addPendingTaskToAddBack(taskId); + } } else { removeTaskToRecycleFromStateUpdater(taskId, inputPartitions); } @@ -560,17 +571,6 @@ public class TaskManager { } } - private void updateInputPartitionsOrRemoveTaskFromTasksToSuspend(final Task task, - final Set<TopicPartition> inputPartitions) { - final TaskId taskId = task.id(); - if (!task.inputPartitions().equals(inputPartitions)) { - stateUpdater.remove(taskId); - tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions); - } else { - tasks.removePendingActiveTaskToSuspend(taskId); - } - } - private void removeTaskToRecycleFromStateUpdater(final TaskId taskId, final Set<TopicPartition> inputPartitions) { stateUpdater.remove(taskId); @@ -913,10 +913,10 @@ public class TaskManager { Set<TopicPartition> inputPartitions; if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions); + } else if (tasks.removePendingTaskToAddBack(task.id())) { + stateUpdater.add(task); } else if (tasks.removePendingTaskToCloseClean(task.id())) { closeTaskClean(task, tasksToCloseDirty, taskExceptions); - } else if (tasks.removePendingTaskToCloseDirty(task.id())) { - tasksToCloseDirty.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); stateUpdater.add(task); @@ -949,8 +949,8 @@ public class TaskManager { recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToCloseClean(task.id())) { closeTaskClean(task, tasksToCloseDirty, taskExceptions); - } else if (tasks.removePendingTaskToCloseDirty(task.id())) { - tasksToCloseDirty.add(task); + } else if (tasks.removePendingTaskToAddBack(task.id())) { + stateUpdater.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); transitRestoredTaskToRunning(task, now, offsetResetter); @@ -1156,7 +1156,7 @@ public class TaskManager { if (stateUpdater != null) { for (final Task restoringTask : stateUpdater.getTasks()) { if (restoringTask.isActive()) { - tasks.addPendingTaskToCloseDirty(restoringTask.id()); + tasks.addPendingTaskToCloseClean(restoringTask.id()); stateUpdater.remove(restoringTask.id()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index 00a9a27da3a..f87713ee532 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -134,8 +134,8 @@ class Tasks implements TasksRegistry { } @Override - public boolean removePendingTaskToCloseDirty(final TaskId taskId) { - if (containsTaskIdWithAction(taskId, Action.CLOSE_DIRTY)) { + public boolean removePendingTaskToAddBack(final TaskId taskId) { + if (containsTaskIdWithAction(taskId, Action.ADD_BACK)) { pendingUpdateActions.remove(taskId); return true; } @@ -143,8 +143,8 @@ class Tasks implements TasksRegistry { } @Override - public void addPendingTaskToCloseDirty(final TaskId taskId) { - pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseDirty()); + public void addPendingTaskToAddBack(final TaskId taskId) { + pendingUpdateActions.put(taskId, PendingUpdateAction.createAddBack()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java index 82fbf097776..18064797d3a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java @@ -45,9 +45,9 @@ public interface TasksRegistry { void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions); - boolean removePendingTaskToCloseDirty(final TaskId taskId); + boolean removePendingTaskToAddBack(final TaskId taskId); - void addPendingTaskToCloseDirty(final TaskId taskId); + void addPendingTaskToAddBack(final TaskId taskId); boolean removePendingTaskToCloseClean(final TaskId taskId); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 8acc9566388..c5ba814de7a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -580,6 +580,27 @@ public class TaskManagerTest { Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } + @Test + public void shouldRemoveReassignedLostTaskInStateUpdaterFromPendingTaskToCloseClean() { + final StreamTask reassignedLostTask = statefulTask(taskId03, taskId03ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId03Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedLostTask)); + when(tasks.removePendingTaskToCloseClean(reassignedLostTask.id())).thenReturn(true); + + taskManager.handleAssignment( + mkMap(mkEntry(reassignedLostTask.id(), reassignedLostTask.inputPartitions())), + Collections.emptyMap() + ); + + Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + Mockito.verify(tasks).removePendingTaskToCloseClean(reassignedLostTask.id()); + Mockito.verify(tasks).addPendingTaskToAddBack(reassignedLostTask.id()); + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); + } + @Test public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() { final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions) @@ -1287,10 +1308,9 @@ public class TaskManagerTest { Mockito.verify(stateUpdater).remove(task1.id()); Mockito.verify(stateUpdater, never()).remove(task2.id()); Mockito.verify(stateUpdater).remove(task3.id()); - Mockito.verify(tasks).addPendingTaskToCloseDirty(task1.id()); - Mockito.verify(tasks, never()).addPendingTaskToCloseDirty(task2.id()); + Mockito.verify(tasks).addPendingTaskToCloseClean(task1.id()); Mockito.verify(tasks, never()).addPendingTaskToCloseClean(task2.id()); - Mockito.verify(tasks).addPendingTaskToCloseDirty(task3.id()); + Mockito.verify(tasks).addPendingTaskToCloseClean(task3.id()); } private TaskManager setupForRevocationAndLost(final Set<Task> tasksInStateUpdater, @@ -1495,24 +1515,20 @@ public class TaskManagerTest { } @Test - public void shouldCloseDirtyRestoredTask() { + public void shouldAddBackRestoredTask() { final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null); - when(tasks.removePendingTaskToCloseDirty(statefulTask.id())).thenReturn(true); + when(tasks.removePendingTaskToAddBack(statefulTask.id())).thenReturn(true); when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); - Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id()); - Mockito.verify(statefulTask).prepareCommit(); - Mockito.verify(statefulTask).suspend(); - Mockito.verify(statefulTask).closeDirty(); - Mockito.verify(statefulTask, never()).closeClean(); + Mockito.verify(stateUpdater).add(statefulTask); Mockito.verify(tasks, never()).removeTask(statefulTask); } @@ -1574,7 +1590,7 @@ public class TaskManagerTest { final StreamTask taskToCloseClean = statefulTask(taskId02, taskId02ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId02Partitions).build(); - final StreamTask taskToCloseDirty = statefulTask(taskId03, taskId03ChangelogPartitions) + final StreamTask taskToAddBack = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final StreamTask taskToUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) @@ -1592,9 +1608,9 @@ public class TaskManagerTest { when(tasks.removePendingTaskToCloseClean( argThat(taskId -> !taskId.equals(taskToCloseClean.id()))) ).thenReturn(false); - when(tasks.removePendingTaskToCloseDirty(taskToCloseDirty.id())).thenReturn(true); - when(tasks.removePendingTaskToCloseDirty( - argThat(taskId -> !taskId.equals(taskToCloseDirty.id()))) + when(tasks.removePendingTaskToAddBack(taskToAddBack.id())).thenReturn(true); + when(tasks.removePendingTaskToAddBack( + argThat(taskId -> !taskId.equals(taskToAddBack.id()))) ).thenReturn(false); when(tasks.removePendingTaskToUpdateInputPartitions(taskToUpdateInputPartitions.id())).thenReturn(taskId05Partitions); when(tasks.removePendingTaskToUpdateInputPartitions( @@ -1605,7 +1621,7 @@ public class TaskManagerTest { taskToTransitToRunning, taskToRecycle, taskToCloseClean, - taskToCloseDirty, + taskToAddBack, taskToUpdateInputPartitions )); @@ -1615,7 +1631,7 @@ public class TaskManagerTest { Mockito.verify(stateUpdater).add(recycledStandbyTask); Mockito.verify(stateUpdater).add(recycledStandbyTask); Mockito.verify(taskToCloseClean).closeClean(); - Mockito.verify(taskToCloseDirty).closeDirty(); + Mockito.verify(stateUpdater).add(taskToAddBack); Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), anyMap()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java index c65756d41f4..ab955d956f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java @@ -152,7 +152,7 @@ public class TasksTest { assertTrue(tasks.hasPendingTasksToRecycle()); tasks.addPendingTaskToCloseClean(TASK_0_1); - tasks.addPendingTaskToCloseDirty(TASK_0_2); + tasks.addPendingTaskToAddBack(TASK_0_2); tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0)); tasks.addPendingActiveTaskToSuspend(TASK_1_2); assertTrue(tasks.hasPendingTasksToRecycle()); @@ -177,7 +177,7 @@ public class TasksTest { assertTrue(tasks.hasPendingTasksToInit()); tasks.addPendingTaskToCloseClean(TASK_0_1); - tasks.addPendingTaskToCloseDirty(TASK_0_2); + tasks.addPendingTaskToAddBack(TASK_0_2); tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0)); tasks.addPendingActiveTaskToSuspend(TASK_1_2); assertTrue(tasks.hasPendingTasksToInit()); @@ -209,13 +209,13 @@ public class TasksTest { } @Test - public void shouldAddAndRemovePendingTaskToCloseDirty() { - assertFalse(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + public void shouldAddAndRemovePendingTaskToAddBack() { + assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0)); - tasks.addPendingTaskToCloseDirty(TASK_0_0); + tasks.addPendingTaskToAddBack(TASK_0_0); - assertTrue(tasks.removePendingTaskToCloseDirty(TASK_0_0)); - assertFalse(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + assertTrue(tasks.removePendingTaskToAddBack(TASK_0_0)); + assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0)); } @Test @@ -232,7 +232,7 @@ public class TasksTest { public void onlyRemovePendingTaskToRecycleShouldRemoveTaskFromPendingUpdateActions() { tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - assertFalse(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0)); assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0)); assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); @@ -243,7 +243,7 @@ public class TasksTest { public void onlyRemovePendingTaskToUpdateInputPartitionsShouldRemoveTaskFromPendingUpdateActions() { tasks.addPendingTaskToUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - assertFalse(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0)); assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0)); assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); @@ -254,7 +254,7 @@ public class TasksTest { public void onlyRemovePendingTaskToCloseCleanShouldRemoveTaskFromPendingUpdateActions() { tasks.addPendingTaskToCloseClean(TASK_0_0); - assertFalse(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0)); assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); @@ -262,14 +262,14 @@ public class TasksTest { } @Test - public void onlyRemovePendingTaskToCloseDirtyShouldRemoveTaskFromPendingUpdateActions() { - tasks.addPendingTaskToCloseDirty(TASK_0_0); + public void onlyRemovePendingTaskToAddBackShouldRemoveTaskFromPendingUpdateActions() { + tasks.addPendingTaskToAddBack(TASK_0_0); assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0)); assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); - assertTrue(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + assertTrue(tasks.removePendingTaskToAddBack(TASK_0_0)); } @Test @@ -277,7 +277,7 @@ public class TasksTest { tasks.addPendingActiveTaskToSuspend(TASK_0_0); assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0)); - assertFalse(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0)); assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); assertTrue(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); @@ -296,13 +296,13 @@ public class TasksTest { assertTrue(tasks.removePendingTaskToCloseClean(TASK_0_0)); tasks.addPendingTaskToCloseClean(TASK_0_0); - tasks.addPendingTaskToCloseDirty(TASK_0_0); + tasks.addPendingTaskToAddBack(TASK_0_0); assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0)); - assertTrue(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + assertTrue(tasks.removePendingTaskToAddBack(TASK_0_0)); - tasks.addPendingTaskToCloseDirty(TASK_0_0); + tasks.addPendingTaskToAddBack(TASK_0_0); tasks.addPendingActiveTaskToSuspend(TASK_0_0); - assertFalse(tasks.removePendingTaskToCloseDirty(TASK_0_0)); + assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0)); assertTrue(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); tasks.addPendingActiveTaskToSuspend(TASK_0_0);