This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e39b252d383b47c2119551df89d90b8212eb6361
Author: Rui Fan <[email protected]>
AuthorDate: Wed Feb 18 21:26:40 2026 +0100

    [FLINK-38543][checkpoint] Change overall UC restore process for checkpoint 
during recovery
---
 .../configuration/CheckpointingOptionsTest.java    |  2 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 25 ++++++----
 .../partition/consumer/SingleInputGateBuilder.java |  8 +++
 .../partition/consumer/SingleInputGateTest.java    | 30 +++++++++++
 .../partition/consumer/UnionInputGateTest.java     | 58 ++++++++++++++++++++++
 5 files changed, 112 insertions(+), 11 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java
index 75c4ec07d23..7f895ef1e51 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java
@@ -330,7 +330,7 @@ class CheckpointingOptionsTest {
     }
 
     @Test
-    void testIsUnalignedDuringRecoveryEnabled() {
+    void testIsCheckpointingDuringRecoveryEnabled() {
         // Test when both options are disabled (default) - should return false
         Configuration defaultConfig = new Configuration();
         
assertThat(CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(defaultConfig))
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 5ca9a5662e3..7938a1ef278 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -883,6 +883,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
         boolean checkpointingDuringRecoveryEnabled =
                 
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration());
+
+        // Must set the flag on input gates BEFORE starting the async read 
task, because
+        // finishReadRecoveredState() checks this flag to complete 
bufferFilteringCompleteFuture.
         for (IndexedInputGate inputGate : inputGates) {
             
inputGate.setCheckpointingDuringRecoveryEnabled(checkpointingDuringRecoveryEnabled);
         }
@@ -899,18 +902,20 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
         // We wait for all input channel state to recover before we go into 
RUNNING state, and thus
         // start checkpointing. If we implement incremental checkpointing of 
input channel state
-        // we must make sure it supports CheckpointType#FULL_CHECKPOINT
+        // we must make sure it supports CheckpointType#FULL_CHECKPOINT.
         List<CompletableFuture<?>> recoveredFutures = new 
ArrayList<>(inputGates.length);
         for (InputGate inputGate : inputGates) {
-            recoveredFutures.add(inputGate.getStateConsumedFuture());
-
-            inputGate
-                    .getStateConsumedFuture()
-                    .thenRun(
-                            () ->
-                                    mainMailboxExecutor.execute(
-                                            inputGate::requestPartitions,
-                                            "Input gate request partitions"));
+            CompletableFuture<?> requestPartitionsTrigger =
+                    checkpointingDuringRecoveryEnabled
+                            ? inputGate.getBufferFilteringCompleteFuture()
+                            : inputGate.getStateConsumedFuture();
+
+            recoveredFutures.add(requestPartitionsTrigger);
+
+            requestPartitionsTrigger.thenRun(
+                    () ->
+                            mainMailboxExecutor.execute(
+                                    inputGate::requestPartitions, "Input gate 
request partitions"));
         }
 
         // Return allOf future instead of thenRun future. thenRun() returns a 
NEW future that
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index e4a4c289dc6..a4da811f8a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -83,6 +83,8 @@ public class SingleInputGateBuilder {
 
     private TieredStorageConsumerClient tieredStorageConsumerClient = null;
 
+    private boolean isCheckpointingDuringRecoveryEnabled = false;
+
     public SingleInputGateBuilder setPartitionProducerStateProvider(
             PartitionProducerStateProvider partitionProducerStateProvider) {
 
@@ -167,6 +169,11 @@ public class SingleInputGateBuilder {
         return this;
     }
 
+    public SingleInputGateBuilder 
setCheckpointingDuringRecoveryEnabled(boolean enabled) {
+        this.isCheckpointingDuringRecoveryEnabled = enabled;
+        return this;
+    }
+
     public SingleInputGate build() {
         SingleInputGate gate =
                 new SingleInputGate(
@@ -195,6 +202,7 @@ public class SingleInputGateBuilder {
                             .toArray(InputChannel[]::new));
         }
         gate.setTieredStorageService(null, tieredStorageConsumerClient, null);
+        
gate.setCheckpointingDuringRecoveryEnabled(isCheckpointingDuringRecoveryEnabled);
         return gate;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index f7f0b744fb9..b2cc9d7ce3c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -142,6 +142,36 @@ class SingleInputGateTest extends InputGateTestBase {
                 .isInstanceOf(CheckpointException.class);
     }
 
+    @Test
+    void testBufferFilteringCompleteFutureAggregation() throws Exception {
+        final NettyShuffleEnvironment environment = 
createNettyShuffleEnvironment();
+        final SingleInputGate inputGate = createInputGate(environment);
+        try (Closer closer = Closer.create()) {
+            closer.register(environment::close);
+            closer.register(inputGate::close);
+
+            // Enable unaligned during recovery for this test so that
+            // bufferFilteringCompleteFuture is completed by 
finishReadRecoveredState()
+            inputGate.setCheckpointingDuringRecoveryEnabled(true);
+            inputGate.setup();
+
+            // Initially, the aggregated future should not be completed
+            
assertThat(inputGate.getBufferFilteringCompleteFuture()).isNotDone();
+
+            // After finishing read recovered state, 
bufferFilteringCompleteFuture should be
+            // completed (only when config is enabled)
+            inputGate.finishReadRecoveredState();
+            assertThat(inputGate.getBufferFilteringCompleteFuture()).isDone();
+
+            // stateConsumedFuture should not be completed until data is 
consumed
+            assertThat(inputGate.getStateConsumedFuture()).isNotDone();
+
+            // Consuming the EndOfInputChannelStateEvent should complete 
stateConsumedFuture
+            inputGate.pollNext();
+            assertThat(inputGate.getStateConsumedFuture()).isDone();
+        }
+    }
+
     /**
      * Tests {@link InputGate#setup()} should create the respective {@link 
BufferPool} and assign
      * exclusive buffers for {@link RemoteInputChannel}s, but should not 
request partitions.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 419246137e8..1ed1a42a66e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.PullingAsyncDataInput;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.TestingResultPartitionManager;
 
 import org.junit.jupiter.api.Test;
@@ -275,6 +277,62 @@ class UnionInputGateTest extends InputGateTestBase {
         assertThat(unionInputGate.getChannel(1)).isEqualTo(inputChannel2);
     }
 
+    @Test
+    void testBufferFilteringCompleteFutureAggregation() throws IOException {
+        // Create 2 SingleInputGates, each with 1 RecoveredInputChannel
+        SingleInputGate ig1 =
+                new 
SingleInputGateBuilder().setCheckpointingDuringRecoveryEnabled(true).build();
+        RecoveredInputChannel channel1 = buildRecoveredChannel(ig1);
+        ig1.setInputChannels(channel1);
+
+        SingleInputGate ig2 =
+                new SingleInputGateBuilder()
+                        .setSingleInputGateIndex(1)
+                        .setCheckpointingDuringRecoveryEnabled(true)
+                        .build();
+        RecoveredInputChannel channel2 = buildRecoveredChannel(ig2);
+        ig2.setInputChannels(channel2);
+
+        UnionInputGate union = new UnionInputGate(ig1, ig2);
+
+        // Initially, bufferFilteringCompleteFuture should not be done
+        assertThat(union.getBufferFilteringCompleteFuture()).isNotDone();
+        assertThat(union.getStateConsumedFuture()).isNotDone();
+
+        // Complete buffer filtering on first gate only
+        channel1.finishReadRecoveredState();
+        assertThat(ig1.getBufferFilteringCompleteFuture()).isDone();
+        assertThat(union.getBufferFilteringCompleteFuture()).isNotDone();
+
+        // Complete buffer filtering on second gate
+        channel2.finishReadRecoveredState();
+        assertThat(ig2.getBufferFilteringCompleteFuture()).isDone();
+        assertThat(union.getBufferFilteringCompleteFuture()).isDone();
+
+        // State consumed futures should still NOT be done (state not consumed 
yet)
+        assertThat(union.getStateConsumedFuture()).isNotDone();
+    }
+
+    private static RecoveredInputChannel buildRecoveredChannel(SingleInputGate 
inputGate) {
+        return new RecoveredInputChannel(
+                inputGate,
+                0,
+                new ResultPartitionID(),
+                new ResultSubpartitionIndexSet(0),
+                0,
+                0,
+                new SimpleCounter(),
+                new SimpleCounter(),
+                10) {
+            @Override
+            protected InputChannel toInputChannelInternal(
+                    
java.util.ArrayDeque<org.apache.flink.runtime.io.network.buffer.Buffer>
+                            remainingBuffers) {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
     @Test
     void testEmptyPull() throws IOException, InterruptedException {
         final SingleInputGate inputGate1 = createInputGate(1);

Reply via email to