This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e39b252d383b47c2119551df89d90b8212eb6361 Author: Rui Fan <[email protected]> AuthorDate: Wed Feb 18 21:26:40 2026 +0100 [FLINK-38543][checkpoint] Change overall UC restore process for checkpoint during recovery --- .../configuration/CheckpointingOptionsTest.java | 2 +- .../flink/streaming/runtime/tasks/StreamTask.java | 25 ++++++---- .../partition/consumer/SingleInputGateBuilder.java | 8 +++ .../partition/consumer/SingleInputGateTest.java | 30 +++++++++++ .../partition/consumer/UnionInputGateTest.java | 58 ++++++++++++++++++++++ 5 files changed, 112 insertions(+), 11 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java index 75c4ec07d23..7f895ef1e51 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java @@ -330,7 +330,7 @@ class CheckpointingOptionsTest { } @Test - void testIsUnalignedDuringRecoveryEnabled() { + void testIsCheckpointingDuringRecoveryEnabled() { // Test when both options are disabled (default) - should return false Configuration defaultConfig = new Configuration(); assertThat(CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(defaultConfig)) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 5ca9a5662e3..7938a1ef278 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -883,6 +883,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> boolean checkpointingDuringRecoveryEnabled = CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration()); + + // Must set the flag on input gates BEFORE starting the async read task, because + // finishReadRecoveredState() checks this flag to complete bufferFilteringCompleteFuture. for (IndexedInputGate inputGate : inputGates) { inputGate.setCheckpointingDuringRecoveryEnabled(checkpointingDuringRecoveryEnabled); } @@ -899,18 +902,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // We wait for all input channel state to recover before we go into RUNNING state, and thus // start checkpointing. If we implement incremental checkpointing of input channel state - // we must make sure it supports CheckpointType#FULL_CHECKPOINT + // we must make sure it supports CheckpointType#FULL_CHECKPOINT. List<CompletableFuture<?>> recoveredFutures = new ArrayList<>(inputGates.length); for (InputGate inputGate : inputGates) { - recoveredFutures.add(inputGate.getStateConsumedFuture()); - - inputGate - .getStateConsumedFuture() - .thenRun( - () -> - mainMailboxExecutor.execute( - inputGate::requestPartitions, - "Input gate request partitions")); + CompletableFuture<?> requestPartitionsTrigger = + checkpointingDuringRecoveryEnabled + ? inputGate.getBufferFilteringCompleteFuture() + : inputGate.getStateConsumedFuture(); + + recoveredFutures.add(requestPartitionsTrigger); + + requestPartitionsTrigger.thenRun( + () -> + mainMailboxExecutor.execute( + inputGate::requestPartitions, "Input gate request partitions")); } // Return allOf future instead of thenRun future. thenRun() returns a NEW future that diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java index e4a4c289dc6..a4da811f8a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java @@ -83,6 +83,8 @@ public class SingleInputGateBuilder { private TieredStorageConsumerClient tieredStorageConsumerClient = null; + private boolean isCheckpointingDuringRecoveryEnabled = false; + public SingleInputGateBuilder setPartitionProducerStateProvider( PartitionProducerStateProvider partitionProducerStateProvider) { @@ -167,6 +169,11 @@ public class SingleInputGateBuilder { return this; } + public SingleInputGateBuilder setCheckpointingDuringRecoveryEnabled(boolean enabled) { + this.isCheckpointingDuringRecoveryEnabled = enabled; + return this; + } + public SingleInputGate build() { SingleInputGate gate = new SingleInputGate( @@ -195,6 +202,7 @@ public class SingleInputGateBuilder { .toArray(InputChannel[]::new)); } gate.setTieredStorageService(null, tieredStorageConsumerClient, null); + gate.setCheckpointingDuringRecoveryEnabled(isCheckpointingDuringRecoveryEnabled); return gate; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index f7f0b744fb9..b2cc9d7ce3c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -142,6 +142,36 @@ class SingleInputGateTest extends InputGateTestBase { .isInstanceOf(CheckpointException.class); } + @Test + void testBufferFilteringCompleteFutureAggregation() throws Exception { + final NettyShuffleEnvironment environment = createNettyShuffleEnvironment(); + final SingleInputGate inputGate = createInputGate(environment); + try (Closer closer = Closer.create()) { + closer.register(environment::close); + closer.register(inputGate::close); + + // Enable unaligned during recovery for this test so that + // bufferFilteringCompleteFuture is completed by finishReadRecoveredState() + inputGate.setCheckpointingDuringRecoveryEnabled(true); + inputGate.setup(); + + // Initially, the aggregated future should not be completed + assertThat(inputGate.getBufferFilteringCompleteFuture()).isNotDone(); + + // After finishing read recovered state, bufferFilteringCompleteFuture should be + // completed (only when config is enabled) + inputGate.finishReadRecoveredState(); + assertThat(inputGate.getBufferFilteringCompleteFuture()).isDone(); + + // stateConsumedFuture should not be completed until data is consumed + assertThat(inputGate.getStateConsumedFuture()).isNotDone(); + + // Consuming the EndOfInputChannelStateEvent should complete stateConsumedFuture + inputGate.pollNext(); + assertThat(inputGate.getStateConsumedFuture()).isDone(); + } + } + /** * Tests {@link InputGate#setup()} should create the respective {@link BufferPool} and assign * exclusive buffers for {@link RemoteInputChannel}s, but should not request partitions. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 419246137e8..1ed1a42a66e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -18,12 +18,14 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.PullingAsyncDataInput; import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.TestingResultPartitionManager; import org.junit.jupiter.api.Test; @@ -275,6 +277,62 @@ class UnionInputGateTest extends InputGateTestBase { assertThat(unionInputGate.getChannel(1)).isEqualTo(inputChannel2); } + @Test + void testBufferFilteringCompleteFutureAggregation() throws IOException { + // Create 2 SingleInputGates, each with 1 RecoveredInputChannel + SingleInputGate ig1 = + new SingleInputGateBuilder().setCheckpointingDuringRecoveryEnabled(true).build(); + RecoveredInputChannel channel1 = buildRecoveredChannel(ig1); + ig1.setInputChannels(channel1); + + SingleInputGate ig2 = + new SingleInputGateBuilder() + .setSingleInputGateIndex(1) + .setCheckpointingDuringRecoveryEnabled(true) + .build(); + RecoveredInputChannel channel2 = buildRecoveredChannel(ig2); + ig2.setInputChannels(channel2); + + UnionInputGate union = new UnionInputGate(ig1, ig2); + + // Initially, bufferFilteringCompleteFuture should not be done + assertThat(union.getBufferFilteringCompleteFuture()).isNotDone(); + assertThat(union.getStateConsumedFuture()).isNotDone(); + + // Complete buffer filtering on first gate only + channel1.finishReadRecoveredState(); + assertThat(ig1.getBufferFilteringCompleteFuture()).isDone(); + assertThat(union.getBufferFilteringCompleteFuture()).isNotDone(); + + // Complete buffer filtering on second gate + channel2.finishReadRecoveredState(); + assertThat(ig2.getBufferFilteringCompleteFuture()).isDone(); + assertThat(union.getBufferFilteringCompleteFuture()).isDone(); + + // State consumed futures should still NOT be done (state not consumed yet) + assertThat(union.getStateConsumedFuture()).isNotDone(); + } + + private static RecoveredInputChannel buildRecoveredChannel(SingleInputGate inputGate) { + return new RecoveredInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + 10) { + @Override + protected InputChannel toInputChannelInternal( + java.util.ArrayDeque<org.apache.flink.runtime.io.network.buffer.Buffer> + remainingBuffers) { + throw new UnsupportedOperationException(); + } + }; + } + @Test void testEmptyPull() throws IOException, InterruptedException { final SingleInputGate inputGate1 = createInputGate(1);
