This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f40b51d623456420aa7e4cf1dc87d47acefe875
Author: Rui Fan <[email protected]>
AuthorDate: Wed Feb 18 21:26:40 2026 +0100

    [FLINK-38543][checkpoint] Fix Mailbox loop interrupted before recovery 
finished
    
    Return allOf future instead of thenRun future. thenRun() returns a NEW 
future that
    completes only after the callback finishes. CompletableFuture executes 
thenRun callbacks
    synchronously on the thread that calls complete(). When recoveredFutures 
contains
    bufferFilteringCompleteFuture (checkpointingDuringRecovery enabled), 
complete() is called
    on channelIOExecutor (in finishReadRecoveredState), so thenRun(suspend) 
also runs on
    channelIOExecutor. suspend() sends a poison mail, and the mailbox thread 
can pick it up
    and exit runMailboxLoop() before the thenRun future completes — causing 
checkState(isDone)
    to fail. With stateConsumedFuture (the default), complete() runs on the 
mailbox thread
    itself, so thenRun(suspend) blocks the loop from processing the poison mail 
until the
    future completes — no race. Returning allOf future avoids the issue 
entirely.
---
 .../flink/streaming/runtime/tasks/StreamTask.java       | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9ec03137842..5ca9a5662e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -913,8 +913,21 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                             "Input gate request partitions"));
         }
 
-        return CompletableFuture.allOf(recoveredFutures.toArray(new 
CompletableFuture[0]))
-                .thenRun(mailboxProcessor::suspend);
+        // Return allOf future instead of thenRun future. thenRun() returns a 
NEW future that
+        // completes only after the callback finishes. CompletableFuture 
executes thenRun callbacks
+        // synchronously on the thread that calls complete(). When 
recoveredFutures contains
+        // bufferFilteringCompleteFuture (checkpointingDuringRecovery 
enabled), complete() is called
+        // on channelIOExecutor (in finishReadRecoveredState), so 
thenRun(suspend) also runs on
+        // channelIOExecutor. suspend() sends a poison mail, and the mailbox 
thread can pick it up
+        // and exit runMailboxLoop() before the thenRun future completes — 
causing
+        // checkState(isDone) to fail. With stateConsumedFuture (the default), 
complete() runs on
+        // the mailbox thread itself, so thenRun(suspend) blocks the loop from 
processing the poison
+        // mail until the future completes — no race. Returning allOf future 
avoids the issue
+        // entirely.
+        CompletableFuture<Void> allRecoveredFuture =
+                CompletableFuture.allOf(recoveredFutures.toArray(new 
CompletableFuture[0]));
+        allRecoveredFuture.thenRun(mailboxProcessor::suspend);
+        return allRecoveredFuture;
     }
 
     private void ensureNotCanceled() {

Reply via email to