This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aaab864c24d77979f8ae06771f30024ed323547e
Author: Rui Fan <[email protected]>
AuthorDate: Wed Feb 18 21:26:40 2026 +0100

    [FLINK-38543][checkpoint] Introduce bufferFilteringCompleteFuture for 
earlier RUNNING state transition
---
 .../io/network/partition/consumer/InputGate.java   |   7 +
 .../partition/consumer/RecoveredInputChannel.java  |  41 +++++-
 .../partition/consumer/SingleInputGate.java        |  15 +++
 .../network/partition/consumer/UnionInputGate.java |   9 ++
 .../runtime/taskmanager/InputGateWithMetrics.java  |   5 +
 .../consumer/RecoveredInputChannelTest.java        | 143 ++++++++++++++++++++-
 .../streaming/runtime/io/MockIndexedInputGate.java |   5 +
 .../flink/streaming/runtime/io/MockInputGate.java  |   5 +
 .../AlignedCheckpointsMassiveRandomTest.java       |   5 +
 9 files changed, 225 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 11d22a8df4a..dd744bae330 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -192,5 +192,12 @@ public abstract class InputGate
 
     public abstract CompletableFuture<Void> getStateConsumedFuture();
 
+    /**
+     * Returns a future that completes when buffer filtering is complete for 
all channels. This
+     * future completes before {@link #getStateConsumedFuture()}, enabling 
earlier RUNNING state
+     * transition when unaligned checkpoint during recovery is enabled.
+     */
+    public abstract CompletableFuture<Void> getBufferFilteringCompleteFuture();
+
     public abstract void finishReadRecoveredState() throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index d2a7a07137d..d9b7885815b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -62,6 +62,13 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
     private final CompletableFuture<?> stateConsumedFuture = new 
CompletableFuture<>();
     protected final BufferManager bufferManager;
 
+    /**
+     * Future that completes when recovered buffers have been filtered for 
this channel. This
+     * completes before stateConsumedFuture, enabling earlier RUNNING state 
transition when
+     * unaligned checkpoint during recovery is enabled.
+     */
+    private final CompletableFuture<Void> bufferFilteringCompleteFuture = new 
CompletableFuture<>();
+
     @GuardedBy("receivedBuffers")
     private boolean isReleased;
 
@@ -110,7 +117,11 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
 
     public final InputChannel toInputChannel() throws IOException {
         Preconditions.checkState(
-                stateConsumedFuture.isDone(), "recovered state is not fully 
consumed");
+                bufferFilteringCompleteFuture.isDone(), "buffer filtering is 
not complete");
+        if (!inputGate.isCheckpointingDuringRecoveryEnabled()) {
+            Preconditions.checkState(
+                    stateConsumedFuture.isDone(), "recovered state is not 
fully consumed");
+        }
 
         // Extract remaining buffers before conversion.
         // These buffers have been filtered but not yet consumed by the Task.
@@ -140,6 +151,14 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
     protected abstract InputChannel toInputChannelInternal(ArrayDeque<Buffer> 
remainingBuffers)
             throws IOException;
 
+    /**
+     * Returns the future that completes when buffer filtering is complete. 
This future completes
+     * before stateConsumedFuture, at the point when 
finishReadRecoveredState() is called.
+     */
+    CompletableFuture<Void> getBufferFilteringCompleteFuture() {
+        return bufferFilteringCompleteFuture;
+    }
+
     CompletableFuture<?> getStateConsumedFuture() {
         return stateConsumedFuture;
     }
@@ -176,8 +195,22 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
     }
 
     public void finishReadRecoveredState() throws IOException {
-        onRecoveredStateBuffer(
-                EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, 
false));
+        // Adding the event and completing the future must be atomic under 
receivedBuffers lock.
+        // Without this, either ordering has a race:
+        // - event first: task thread consumes EndOfInputChannelStateEvent, 
which completes
+        //   stateConsumedFuture. When checkpointing during recovery is 
disabled,
+        //   stateConsumedFuture triggers requestPartitions -> 
toInputChannel(), which
+        //   fails because bufferFilteringCompleteFuture is not yet done.
+        // - future first: toInputChannel() extracts buffers before the event 
is added,
+        //   losing the EndOfInputChannelStateEvent.
+        // Both toInputChannel() and getNextRecoveredStateBuffer() synchronize 
on
+        // receivedBuffers, so holding the same lock here guarantees
+        // bufferFilteringCompleteFuture is always done before 
stateConsumedFuture.
+        synchronized (receivedBuffers) {
+            onRecoveredStateBuffer(
+                    
EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, false));
+            bufferFilteringCompleteFuture.complete(null);
+        }
         bufferManager.releaseFloatingBuffers();
         LOG.debug("{}/{} finished recovering input.", 
inputGate.getOwningTaskName(), channelInfo);
     }
@@ -196,6 +229,8 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
         if (next == null) {
             return null;
         } else if (isEndOfInputChannelStateEvent(next)) {
+            Preconditions.checkState(
+                    bufferFilteringCompleteFuture.isDone(), "buffer filtering 
is not complete");
             stateConsumedFuture.complete(null);
             return null;
         } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 2847e36fcc2..438efa2f58b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -341,6 +341,21 @@ public class SingleInputGate extends IndexedInputGate {
         return checkpointingDuringRecoveryEnabled;
     }
 
+    @Override
+    public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
+        synchronized (requestLock) {
+            List<CompletableFuture<?>> futures = new 
ArrayList<>(numberOfInputChannels);
+            for (InputChannel inputChannel : inputChannels()) {
+                if (inputChannel instanceof RecoveredInputChannel) {
+                    futures.add(
+                            ((RecoveredInputChannel) inputChannel)
+                                    .getBufferFilteringCompleteFuture());
+                }
+            }
+            return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0]));
+        }
+    }
+
     @Override
     public void requestPartitions() {
         synchronized (requestLock) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 6c7c765938b..dda71c63be3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -350,6 +350,15 @@ public class UnionInputGate extends InputGate {
                         .toArray(new CompletableFuture[] {}));
     }
 
+    @Override
+    public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
+        return CompletableFuture.allOf(
+                inputGatesByGateIndex.values().stream()
+                        .map(InputGate::getBufferFilteringCompleteFuture)
+                        .collect(Collectors.toList())
+                        .toArray(new CompletableFuture[] {}));
+    }
+
     @Override
     public void requestPartitions() throws IOException {
         for (InputGate inputGate : inputGatesByGateIndex.values()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 31775fb21de..bff412f53b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -120,6 +120,11 @@ public class InputGateWithMetrics extends IndexedInputGate 
{
         return inputGate.getStateConsumedFuture();
     }
 
+    @Override
+    public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
+        return inputGate.getBufferFilteringCompleteFuture();
+    }
+
     @Override
     public void requestPartitions() throws IOException {
         inputGate.requestPartitions();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
index 5985a81e8ca..f40fd09702e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
@@ -28,24 +28,30 @@ import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned;
 import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link RecoveredInputChannel}. */
 class RecoveredInputChannelTest {
 
     @Test
-    void testConversionOnlyPossibleAfterConsumed() {
-        assertThatThrownBy(() -> buildChannel().toInputChannel())
-                .isInstanceOf(IllegalStateException.class);
+    void testConversionOnlyPossibleAfterBufferFilteringComplete() {
+        // toInputChannel() always checks bufferFilteringCompleteFuture 
regardless of config
+        for (boolean configEnabled : new boolean[] {true, false}) {
+            assertThatThrownBy(() -> 
buildChannel(configEnabled).toInputChannel())
+                    .isInstanceOf(IllegalStateException.class)
+                    .hasMessageContaining("buffer filtering is not complete");
+        }
     }
 
     @Test
     void testRequestPartitionsImpossible() {
-        assertThatThrownBy(() -> buildChannel().requestSubpartitions())
+        assertThatThrownBy(() -> buildChannel(false).requestSubpartitions())
                 .isInstanceOf(UnsupportedOperationException.class);
     }
 
@@ -53,7 +59,7 @@ class RecoveredInputChannelTest {
     void testCheckpointStartImpossible() {
         assertThatThrownBy(
                         () ->
-                                buildChannel()
+                                buildChannel(false)
                                         .checkpointStarted(
                                                 new CheckpointBarrier(
                                                         0L,
@@ -64,10 +70,96 @@ class RecoveredInputChannelTest {
                 .isInstanceOf(CheckpointException.class);
     }
 
-    private RecoveredInputChannel buildChannel() {
+    @Test
+    void 
testToInputChannelAllowedWhenBufferFilteringCompleteAndConfigEnabled() throws 
IOException {
+        // When config is enabled, conversion is allowed when 
bufferFilteringCompleteFuture is done
+        TestableRecoveredInputChannel channel = buildTestableChannel(true);
+
+        // Initially, conversion should fail
+        assertThatThrownBy(() -> channel.toInputChannel())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("buffer filtering is not complete");
+
+        // After finishReadRecoveredState(), bufferFilteringCompleteFuture 
should be done
+        channel.finishReadRecoveredState();
+        assertThat(channel.getBufferFilteringCompleteFuture()).isDone();
+        assertThat(channel.getStateConsumedFuture()).isNotDone();
+
+        // Conversion should now succeed (no exception)
+        InputChannel converted = channel.toInputChannel();
+        assertThat(converted).isNotNull();
+    }
+
+    @Test
+    void testToInputChannelAllowedWhenStateConsumedAndConfigDisabled() throws 
IOException {
+        // When config is disabled, conversion requires both 
bufferFilteringCompleteFuture
+        // and stateConsumedFuture to be done
+        TestableRecoveredInputChannel channel = buildTestableChannel(false);
+
+        // Initially, conversion should fail (buffer filtering not complete)
+        assertThatThrownBy(() -> channel.toInputChannel())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("buffer filtering is not complete");
+
+        // After finishReadRecoveredState(), bufferFilteringCompleteFuture is 
done
+        // but stateConsumedFuture is not
+        channel.finishReadRecoveredState();
+        assertThat(channel.getBufferFilteringCompleteFuture()).isDone();
+        assertThat(channel.getStateConsumedFuture()).isNotDone();
+
+        // Conversion should still fail because stateConsumedFuture is not done
+        assertThatThrownBy(() -> channel.toInputChannel())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("recovered state is not fully consumed");
+
+        // Consume the EndOfInputChannelStateEvent to complete 
stateConsumedFuture
+        assertThat(channel.getNextBuffer()).isNotPresent();
+        assertThat(channel.getStateConsumedFuture()).isDone();
+
+        // Now conversion should succeed
+        InputChannel converted = channel.toInputChannel();
+        assertThat(converted).isNotNull();
+    }
+
+    @Test
+    void testBufferFilteringCompleteFutureAlwaysCompletes() throws IOException 
{
+        // finishReadRecoveredState() unconditionally completes 
bufferFilteringCompleteFuture
+        for (boolean configEnabled : new boolean[] {true, false}) {
+            RecoveredInputChannel channel = buildChannel(configEnabled);
+            assertThat(channel.getBufferFilteringCompleteFuture()).isNotDone();
+            channel.finishReadRecoveredState();
+            assertThat(channel.getBufferFilteringCompleteFuture()).isDone();
+        }
+    }
+
+    @Test
+    void testStateConsumedFutureCompletesAfterConsumingAllBuffers() throws 
IOException {
+        // This test verifies that stateConsumedFuture completes after 
consuming
+        // EndOfInputChannelStateEvent regardless of the config setting
+        for (boolean configEnabled : new boolean[] {true, false}) {
+            RecoveredInputChannel channel = buildChannel(configEnabled);
+
+            assertThat(channel.getStateConsumedFuture()).isNotDone();
+
+            channel.finishReadRecoveredState();
+            assertThat(channel.getStateConsumedFuture()).isNotDone();
+
+            // Consuming the EndOfInputChannelStateEvent should complete the 
future.
+            // getNextBuffer() returns empty when it encounters the event 
internally.
+            assertThat(channel.getNextBuffer()).isNotPresent();
+            assertThat(channel.getStateConsumedFuture()).isDone();
+        }
+    }
+
+    private RecoveredInputChannel buildChannel(boolean 
checkpointingDuringRecoveryEnabled) {
         try {
+            SingleInputGate inputGate =
+                    new SingleInputGateBuilder()
+                            .setCheckpointingDuringRecoveryEnabled(
+                                    checkpointingDuringRecoveryEnabled)
+                            .build();
             return new RecoveredInputChannel(
-                    new SingleInputGateBuilder().build(),
+                    inputGate,
                     0,
                     new ResultPartitionID(),
                     new ResultSubpartitionIndexSet(0),
@@ -85,4 +177,41 @@ class RecoveredInputChannelTest {
             throw new AssertionError("channel creation failed", e);
         }
     }
+
+    private TestableRecoveredInputChannel buildTestableChannel(
+            boolean checkpointingDuringRecoveryEnabled) {
+        try {
+            SingleInputGate inputGate =
+                    new SingleInputGateBuilder()
+                            .setCheckpointingDuringRecoveryEnabled(
+                                    checkpointingDuringRecoveryEnabled)
+                            .build();
+            return new TestableRecoveredInputChannel(inputGate);
+        } catch (Exception e) {
+            throw new AssertionError("channel creation failed", e);
+        }
+    }
+
+    /**
+     * A RecoveredInputChannel that returns a TestInputChannel when converted, 
for testing purposes.
+     */
+    private static class TestableRecoveredInputChannel extends 
RecoveredInputChannel {
+        TestableRecoveredInputChannel(SingleInputGate inputGate) {
+            super(
+                    inputGate,
+                    0,
+                    new ResultPartitionID(),
+                    new ResultSubpartitionIndexSet(0),
+                    0,
+                    0,
+                    new SimpleCounter(),
+                    new SimpleCounter(),
+                    10);
+        }
+
+        @Override
+        protected InputChannel toInputChannelInternal(ArrayDeque<Buffer> 
remainingBuffers) {
+            return new TestInputChannel(inputGate, 0);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index 618f31e518f..584aeb7eb90 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -57,6 +57,11 @@ public class MockIndexedInputGate extends IndexedInputGate {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
+        return CompletableFuture.completedFuture(null);
+    }
+
     @Override
     public void finishReadRecoveredState() {}
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index a35c8995d2a..71b2c43f330 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -80,6 +80,11 @@ public class MockInputGate extends IndexedInputGate {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
+        return CompletableFuture.completedFuture(null);
+    }
+
     @Override
     public void finishReadRecoveredState() {}
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
index da3d2551934..619873c387d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
@@ -263,6 +263,11 @@ class AlignedCheckpointsMassiveRandomTest {
             return CompletableFuture.completedFuture(null);
         }
 
+        @Override
+        public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
+            return CompletableFuture.completedFuture(null);
+        }
+
         @Override
         public void finishReadRecoveredState() {}
 

Reply via email to