This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 0b75b5002c4 KAFKA-19994: TaskManager may not close all tasks on task 
timeouts (#21155)
0b75b5002c4 is described below

commit 0b75b5002c4268791bc1d8b45e8506d341796065
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Dec 16 09:40:24 2025 +0100

    KAFKA-19994: TaskManager may not close all tasks on task timeouts (#21155)
    
    When a TimeoutException occurs while trying to put multiple active tasks
    back   into running, we will add the timed out task back to the state
    updater, so that   we retry it.
    
    However, if we run into a Task timeout (failing to make progress for a
    long   time), we will rethrow a StreamsException wrapping the
    TimeoutException we   have drained multiple tasks from the state
    updater, they will be lost, and   not added back to the state updater,
    and therefore not be closed correctly.   The task directories remain
    locked, causing issues trying to replace the   stream thread.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../streams/processor/internals/TaskManager.java   | 24 +++++++++--
 .../processor/internals/TaskManagerTest.java       | 46 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 064662ee01f..fab9db663b5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -57,6 +57,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -983,9 +984,12 @@ public class TaskManager {
         }
     }
 
+    /**
+     * @throws StreamsException if fetching committed offsets timed out often 
enough to exceed task timeout
+     */
     private void transitRestoredTaskToRunning(final Task task,
                                               final long now,
-                                              final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+                                              final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) throws 
StreamsException {
         try {
             task.completeRestoration(offsetResetter);
             tasks.addTask(task);
@@ -1071,8 +1075,22 @@ public class TaskManager {
     private void handleRestoredTasksFromStateUpdater(final long now,
                                                      final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
         final Duration timeout = Duration.ZERO;
-        for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) 
{
-            transitRestoredTaskToRunning(task, now, offsetResetter);
+        // Create a mutable copy to support iterator.remove()
+        final Set<StreamTask> restoredTasks = new 
LinkedHashSet<>(stateUpdater.drainRestoredActiveTasks(timeout));
+        final Iterator<StreamTask> iterator = restoredTasks.iterator();
+
+        try {
+            while (iterator.hasNext()) {
+                final Task task = iterator.next();
+                transitRestoredTaskToRunning(task, now, offsetResetter);
+                iterator.remove(); // Remove successfully transitioned tasks
+            }
+        } finally {
+            // Add back any tasks that we drained but didn't successfully 
transition
+            // from the state updater, so that they are closed during shutdown.
+            for (final Task task : restoredTasks) {
+                stateUpdater.add(task);
+            }
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 4f13c41ba31..77e6aae46df 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1620,6 +1620,52 @@ public class TaskManagerTest {
         verifyNoInteractions(consumer);
     }
 
+    @Test
+    public void shouldAddFailedRestoredTasksBackToStateUpdaterOnException() {
+        final StreamTask task1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamTask task2 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId01Partitions).build();
+        final StreamTask task3 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId02Partitions).build();
+
+        // Use LinkedHashSet to ensure predictable iteration order
+        final Set<StreamTask> restoredTasks = new java.util.LinkedHashSet<>();
+        restoredTasks.add(task1);
+        restoredTasks.add(task2);
+        restoredTasks.add(task3);
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTransitionToRunningOfRestoredTask(restoredTasks, tasks);
+
+        // task1 completes successfully, task2 throws StreamsException from 
maybeInitTaskTimeoutOrThrow
+        // task3 is never processed because task2 throws
+        final TimeoutException timeoutException = new TimeoutException();
+        
doThrow(timeoutException).when(task2).completeRestoration(noOpResetter);
+        doThrow(new StreamsException("Task timeout exceeded", 
task2.id())).when(task2).maybeInitTaskTimeoutOrThrow(anyLong(), 
eq(timeoutException));
+
+        assertThrows(StreamsException.class, () -> 
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
+
+        // task1 should be successfully transitioned
+        verify(tasks).addTask(task1);
+        verify(consumer).resume(task1.inputPartitions());
+        verify(task1).clearTaskTimeout();
+
+        // task2 should be added back to state updater once in the finally 
block
+        // (the add in the catch block doesn't execute because 
maybeInitTaskTimeoutOrThrow throws)
+        verify(stateUpdater).add(task2);
+        verify(tasks, never()).addTask(task2);
+        verify(task2, never()).clearTaskTimeout();
+
+        // task3 should also be added back to state updater in the finally 
block
+        verify(stateUpdater).add(task3);
+        verify(tasks, never()).addTask(task3);
+        verify(task3, never()).clearTaskTimeout();
+    }
+
     private TaskManager setUpTransitionToRunningOfRestoredTask(final 
Set<StreamTask> statefulTasks,
                                                                final 
TasksRegistry tasks) {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);

Reply via email to