This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch timeout_state_updater in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2f5b04912ab4c3f12f74c2bd298fb02235da3131 Author: Lucas Brutschy <[email protected]> AuthorDate: Mon Dec 15 15:29:03 2025 +0100 KAFKA-19994: TaskManager may not close all tasks on task timeouts When a TimeoutException occurs while trying to put multiple active tasks back into running, we will add the timed out task back to the state updater, so that we retry it. However, if we run into a Task timeout (failing to make progress for a long time), we will rethrow a StreamsException wrapping the TimeoutException we have drained multiple tasks from the state updater, they will be lost, and not added back to the state updater, and therefore not be closed correctly. The task directories remain locked, causing issues trying to replace the stream thread. --- .../streams/processor/internals/TaskManager.java | 24 +++++++++-- .../processor/internals/TaskManagerTest.java | 46 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index c1b1c06379e..b7d728226ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -57,6 +57,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -1046,9 +1047,12 @@ public class TaskManager { } } + /** + * @throws StreamsException if fetching committed offsets timed out often enough to exceed task timeout + */ private void transitRestoredTaskToRunning(final Task task, final long now, - final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) { + final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) throws StreamsException { try { task.completeRestoration(offsetResetter); tasks.addTask(task); @@ -1134,8 +1138,22 @@ public class TaskManager { private void handleRestoredTasksFromStateUpdater(final long now, final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) { final Duration timeout = Duration.ZERO; - for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) { - transitRestoredTaskToRunning(task, now, offsetResetter); + // Create a mutable copy to support iterator.remove() + final Set<StreamTask> restoredTasks = new LinkedHashSet<>(stateUpdater.drainRestoredActiveTasks(timeout)); + final Iterator<StreamTask> iterator = restoredTasks.iterator(); + + try { + while (iterator.hasNext()) { + final Task task = iterator.next(); + transitRestoredTaskToRunning(task, now, offsetResetter); + iterator.remove(); // Remove successfully transitioned tasks + } + } finally { + // Add back any tasks that we drained but didn't successfully transition + // from the state updater, so that they are closed during shutdown. + for (final Task task : restoredTasks) { + stateUpdater.add(task); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 3e87eebe733..a5403d425eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1620,6 +1620,52 @@ public class TaskManagerTest { verifyNoInteractions(consumer); } + @Test + public void shouldAddFailedRestoredTasksBackToStateUpdaterOnException() { + final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId01Partitions).build(); + final StreamTask task3 = statefulTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId02Partitions).build(); + + // Use LinkedHashSet to ensure predictable iteration order + final Set<StreamTask> restoredTasks = new java.util.LinkedHashSet<>(); + restoredTasks.add(task1); + restoredTasks.add(task2); + restoredTasks.add(task3); + + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(restoredTasks, tasks); + + // task1 completes successfully, task2 throws StreamsException from maybeInitTaskTimeoutOrThrow + // task3 is never processed because task2 throws + final TimeoutException timeoutException = new TimeoutException(); + doThrow(timeoutException).when(task2).completeRestoration(noOpResetter); + doThrow(new StreamsException("Task timeout exceeded", task2.id())).when(task2).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException)); + + assertThrows(StreamsException.class, () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); + + // task1 should be successfully transitioned + verify(tasks).addTask(task1); + verify(consumer).resume(task1.inputPartitions()); + verify(task1).clearTaskTimeout(); + + // task2 should be added back to state updater once in the finally block + // (the add in the catch block doesn't execute because maybeInitTaskTimeoutOrThrow throws) + verify(stateUpdater).add(task2); + verify(tasks, never()).addTask(task2); + verify(task2, never()).clearTaskTimeout(); + + // task3 should also be added back to state updater in the finally block + verify(stateUpdater).add(task3); + verify(tasks, never()).addTask(task3); + verify(task3, never()).clearTaskTimeout(); + } + private TaskManager setUpTransitionToRunningOfRestoredTask(final Set<StreamTask> statefulTasks, final TasksRegistry tasks) { when(stateUpdater.restoresActiveTasks()).thenReturn(true);
