This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8f40b51d623456420aa7e4cf1dc87d47acefe875 Author: Rui Fan <[email protected]> AuthorDate: Wed Feb 18 21:26:40 2026 +0100 [FLINK-38543][checkpoint] Fix Mailbox loop interrupted before recovery finished Return allOf future instead of thenRun future. thenRun() returns a NEW future that completes only after the callback finishes. CompletableFuture executes thenRun callbacks synchronously on the thread that calls complete(). When recoveredFutures contains bufferFilteringCompleteFuture (checkpointingDuringRecovery enabled), complete() is called on channelIOExecutor (in finishReadRecoveredState), so thenRun(suspend) also runs on channelIOExecutor. suspend() sends a poison mail, and the mailbox thread can pick it up and exit runMailboxLoop() before the thenRun future completes — causing checkState(isDone) to fail. With stateConsumedFuture (the default), complete() runs on the mailbox thread itself, so thenRun(suspend) blocks the loop from processing the poison mail until the future completes — no race. Returning allOf future avoids the issue entirely. --- .../flink/streaming/runtime/tasks/StreamTask.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 9ec03137842..5ca9a5662e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -913,8 +913,21 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> "Input gate request partitions")); } - return CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0])) - .thenRun(mailboxProcessor::suspend); + // Return allOf future instead of thenRun future. thenRun() returns a NEW future that + // completes only after the callback finishes. CompletableFuture executes thenRun callbacks + // synchronously on the thread that calls complete(). When recoveredFutures contains + // bufferFilteringCompleteFuture (checkpointingDuringRecovery enabled), complete() is called + // on channelIOExecutor (in finishReadRecoveredState), so thenRun(suspend) also runs on + // channelIOExecutor. suspend() sends a poison mail, and the mailbox thread can pick it up + // and exit runMailboxLoop() before the thenRun future completes — causing + // checkState(isDone) to fail. With stateConsumedFuture (the default), complete() runs on + // the mailbox thread itself, so thenRun(suspend) blocks the loop from processing the poison + // mail until the future completes — no race. Returning allOf future avoids the issue + // entirely. + CompletableFuture<Void> allRecoveredFuture = + CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0])); + allRecoveredFuture.thenRun(mailboxProcessor::suspend); + return allRecoveredFuture; } private void ensureNotCanceled() {
