This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch timeout_state_updater
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2f5b04912ab4c3f12f74c2bd298fb02235da3131
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Dec 15 15:29:03 2025 +0100

    KAFKA-19994: TaskManager may not close all tasks on task timeouts
    
    When a TimeoutException occurs while trying to put multiple active tasks 
back
    into running, we will add the timed out task back to the state updater, so
    that we retry it.
    
    However, if we run into a Task timeout (failing to make progress for a long
    time), we will rethrow a StreamsException wrapping the TimeoutException we
    have drained multiple tasks from the state updater, they will be lost, and
    not added back to the state updater, and therefore not be closed correctly.
    The task directories remain locked, causing issues trying to replace the
    stream thread.
---
 .../streams/processor/internals/TaskManager.java   | 24 +++++++++--
 .../processor/internals/TaskManagerTest.java       | 46 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c1b1c06379e..b7d728226ba 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -57,6 +57,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -1046,9 +1047,12 @@ public class TaskManager {
         }
     }
 
+    /**
+     * @throws StreamsException if fetching committed offsets timed out often 
enough to exceed task timeout
+     */
     private void transitRestoredTaskToRunning(final Task task,
                                               final long now,
-                                              final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+                                              final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) throws 
StreamsException {
         try {
             task.completeRestoration(offsetResetter);
             tasks.addTask(task);
@@ -1134,8 +1138,22 @@ public class TaskManager {
     private void handleRestoredTasksFromStateUpdater(final long now,
                                                      final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
         final Duration timeout = Duration.ZERO;
-        for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) 
{
-            transitRestoredTaskToRunning(task, now, offsetResetter);
+        // Create a mutable copy to support iterator.remove()
+        final Set<StreamTask> restoredTasks = new 
LinkedHashSet<>(stateUpdater.drainRestoredActiveTasks(timeout));
+        final Iterator<StreamTask> iterator = restoredTasks.iterator();
+
+        try {
+            while (iterator.hasNext()) {
+                final Task task = iterator.next();
+                transitRestoredTaskToRunning(task, now, offsetResetter);
+                iterator.remove(); // Remove successfully transitioned tasks
+            }
+        } finally {
+            // Add back any tasks that we drained but didn't successfully 
transition
+            // from the state updater, so that they are closed during shutdown.
+            for (final Task task : restoredTasks) {
+                stateUpdater.add(task);
+            }
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3e87eebe733..a5403d425eb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1620,6 +1620,52 @@ public class TaskManagerTest {
         verifyNoInteractions(consumer);
     }
 
+    @Test
+    public void shouldAddFailedRestoredTasksBackToStateUpdaterOnException() {
+        final StreamTask task1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamTask task2 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId01Partitions).build();
+        final StreamTask task3 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId02Partitions).build();
+
+        // Use LinkedHashSet to ensure predictable iteration order
+        final Set<StreamTask> restoredTasks = new java.util.LinkedHashSet<>();
+        restoredTasks.add(task1);
+        restoredTasks.add(task2);
+        restoredTasks.add(task3);
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTransitionToRunningOfRestoredTask(restoredTasks, tasks);
+
+        // task1 completes successfully, task2 throws StreamsException from 
maybeInitTaskTimeoutOrThrow
+        // task3 is never processed because task2 throws
+        final TimeoutException timeoutException = new TimeoutException();
+        
doThrow(timeoutException).when(task2).completeRestoration(noOpResetter);
+        doThrow(new StreamsException("Task timeout exceeded", 
task2.id())).when(task2).maybeInitTaskTimeoutOrThrow(anyLong(), 
eq(timeoutException));
+
+        assertThrows(StreamsException.class, () -> 
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
+
+        // task1 should be successfully transitioned
+        verify(tasks).addTask(task1);
+        verify(consumer).resume(task1.inputPartitions());
+        verify(task1).clearTaskTimeout();
+
+        // task2 should be added back to state updater once in the finally 
block
+        // (the add in the catch block doesn't execute because 
maybeInitTaskTimeoutOrThrow throws)
+        verify(stateUpdater).add(task2);
+        verify(tasks, never()).addTask(task2);
+        verify(task2, never()).clearTaskTimeout();
+
+        // task3 should also be added back to state updater in the finally 
block
+        verify(stateUpdater).add(task3);
+        verify(tasks, never()).addTask(task3);
+        verify(task3, never()).clearTaskTimeout();
+    }
+
     private TaskManager setUpTransitionToRunningOfRestoredTask(final 
Set<StreamTask> statefulTasks,
                                                                final 
TasksRegistry tasks) {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);

Reply via email to