This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aaab864c24d77979f8ae06771f30024ed323547e Author: Rui Fan <[email protected]> AuthorDate: Wed Feb 18 21:26:40 2026 +0100 [FLINK-38543][checkpoint] Introduce bufferFilteringCompleteFuture for earlier RUNNING state transition --- .../io/network/partition/consumer/InputGate.java | 7 + .../partition/consumer/RecoveredInputChannel.java | 41 +++++- .../partition/consumer/SingleInputGate.java | 15 +++ .../network/partition/consumer/UnionInputGate.java | 9 ++ .../runtime/taskmanager/InputGateWithMetrics.java | 5 + .../consumer/RecoveredInputChannelTest.java | 143 ++++++++++++++++++++- .../streaming/runtime/io/MockIndexedInputGate.java | 5 + .../flink/streaming/runtime/io/MockInputGate.java | 5 + .../AlignedCheckpointsMassiveRandomTest.java | 5 + 9 files changed, 225 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 11d22a8df4a..dd744bae330 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -192,5 +192,12 @@ public abstract class InputGate public abstract CompletableFuture<Void> getStateConsumedFuture(); + /** + * Returns a future that completes when buffer filtering is complete for all channels. This + * future completes before {@link #getStateConsumedFuture()}, enabling earlier RUNNING state + * transition when unaligned checkpoint during recovery is enabled. + */ + public abstract CompletableFuture<Void> getBufferFilteringCompleteFuture(); + public abstract void finishReadRecoveredState() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java index d2a7a07137d..d9b7885815b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java @@ -62,6 +62,13 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan private final CompletableFuture<?> stateConsumedFuture = new CompletableFuture<>(); protected final BufferManager bufferManager; + /** + * Future that completes when recovered buffers have been filtered for this channel. This + * completes before stateConsumedFuture, enabling earlier RUNNING state transition when + * unaligned checkpoint during recovery is enabled. + */ + private final CompletableFuture<Void> bufferFilteringCompleteFuture = new CompletableFuture<>(); + @GuardedBy("receivedBuffers") private boolean isReleased; @@ -110,7 +117,11 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan public final InputChannel toInputChannel() throws IOException { Preconditions.checkState( - stateConsumedFuture.isDone(), "recovered state is not fully consumed"); + bufferFilteringCompleteFuture.isDone(), "buffer filtering is not complete"); + if (!inputGate.isCheckpointingDuringRecoveryEnabled()) { + Preconditions.checkState( + stateConsumedFuture.isDone(), "recovered state is not fully consumed"); + } // Extract remaining buffers before conversion. // These buffers have been filtered but not yet consumed by the Task. @@ -140,6 +151,14 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan protected abstract InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffers) throws IOException; + /** + * Returns the future that completes when buffer filtering is complete. This future completes + * before stateConsumedFuture, at the point when finishReadRecoveredState() is called. + */ + CompletableFuture<Void> getBufferFilteringCompleteFuture() { + return bufferFilteringCompleteFuture; + } + CompletableFuture<?> getStateConsumedFuture() { return stateConsumedFuture; } @@ -176,8 +195,22 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan } public void finishReadRecoveredState() throws IOException { - onRecoveredStateBuffer( - EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, false)); + // Adding the event and completing the future must be atomic under receivedBuffers lock. + // Without this, either ordering has a race: + // - event first: task thread consumes EndOfInputChannelStateEvent, which completes + // stateConsumedFuture. When checkpointing during recovery is disabled, + // stateConsumedFuture triggers requestPartitions -> toInputChannel(), which + // fails because bufferFilteringCompleteFuture is not yet done. + // - future first: toInputChannel() extracts buffers before the event is added, + // losing the EndOfInputChannelStateEvent. + // Both toInputChannel() and getNextRecoveredStateBuffer() synchronize on + // receivedBuffers, so holding the same lock here guarantees + // bufferFilteringCompleteFuture is always done before stateConsumedFuture. + synchronized (receivedBuffers) { + onRecoveredStateBuffer( + EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, false)); + bufferFilteringCompleteFuture.complete(null); + } bufferManager.releaseFloatingBuffers(); LOG.debug("{}/{} finished recovering input.", inputGate.getOwningTaskName(), channelInfo); } @@ -196,6 +229,8 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan if (next == null) { return null; } else if (isEndOfInputChannelStateEvent(next)) { + Preconditions.checkState( + bufferFilteringCompleteFuture.isDone(), "buffer filtering is not complete"); stateConsumedFuture.complete(null); return null; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 2847e36fcc2..438efa2f58b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -341,6 +341,21 @@ public class SingleInputGate extends IndexedInputGate { return checkpointingDuringRecoveryEnabled; } + @Override + public CompletableFuture<Void> getBufferFilteringCompleteFuture() { + synchronized (requestLock) { + List<CompletableFuture<?>> futures = new ArrayList<>(numberOfInputChannels); + for (InputChannel inputChannel : inputChannels()) { + if (inputChannel instanceof RecoveredInputChannel) { + futures.add( + ((RecoveredInputChannel) inputChannel) + .getBufferFilteringCompleteFuture()); + } + } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + } + } + @Override public void requestPartitions() { synchronized (requestLock) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 6c7c765938b..dda71c63be3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -350,6 +350,15 @@ public class UnionInputGate extends InputGate { .toArray(new CompletableFuture[] {})); } + @Override + public CompletableFuture<Void> getBufferFilteringCompleteFuture() { + return CompletableFuture.allOf( + inputGatesByGateIndex.values().stream() + .map(InputGate::getBufferFilteringCompleteFuture) + .collect(Collectors.toList()) + .toArray(new CompletableFuture[] {})); + } + @Override public void requestPartitions() throws IOException { for (InputGate inputGate : inputGatesByGateIndex.values()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index 31775fb21de..bff412f53b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -120,6 +120,11 @@ public class InputGateWithMetrics extends IndexedInputGate { return inputGate.getStateConsumedFuture(); } + @Override + public CompletableFuture<Void> getBufferFilteringCompleteFuture() { + return inputGate.getBufferFilteringCompleteFuture(); + } + @Override public void requestPartitions() throws IOException { inputGate.requestPartitions(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java index 5985a81e8ca..f40fd09702e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java @@ -28,24 +28,30 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.ArrayDeque; import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned; import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link RecoveredInputChannel}. */ class RecoveredInputChannelTest { @Test - void testConversionOnlyPossibleAfterConsumed() { - assertThatThrownBy(() -> buildChannel().toInputChannel()) - .isInstanceOf(IllegalStateException.class); + void testConversionOnlyPossibleAfterBufferFilteringComplete() { + // toInputChannel() always checks bufferFilteringCompleteFuture regardless of config + for (boolean configEnabled : new boolean[] {true, false}) { + assertThatThrownBy(() -> buildChannel(configEnabled).toInputChannel()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("buffer filtering is not complete"); + } } @Test void testRequestPartitionsImpossible() { - assertThatThrownBy(() -> buildChannel().requestSubpartitions()) + assertThatThrownBy(() -> buildChannel(false).requestSubpartitions()) .isInstanceOf(UnsupportedOperationException.class); } @@ -53,7 +59,7 @@ class RecoveredInputChannelTest { void testCheckpointStartImpossible() { assertThatThrownBy( () -> - buildChannel() + buildChannel(false) .checkpointStarted( new CheckpointBarrier( 0L, @@ -64,10 +70,96 @@ class RecoveredInputChannelTest { .isInstanceOf(CheckpointException.class); } - private RecoveredInputChannel buildChannel() { + @Test + void testToInputChannelAllowedWhenBufferFilteringCompleteAndConfigEnabled() throws IOException { + // When config is enabled, conversion is allowed when bufferFilteringCompleteFuture is done + TestableRecoveredInputChannel channel = buildTestableChannel(true); + + // Initially, conversion should fail + assertThatThrownBy(() -> channel.toInputChannel()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("buffer filtering is not complete"); + + // After finishReadRecoveredState(), bufferFilteringCompleteFuture should be done + channel.finishReadRecoveredState(); + assertThat(channel.getBufferFilteringCompleteFuture()).isDone(); + assertThat(channel.getStateConsumedFuture()).isNotDone(); + + // Conversion should now succeed (no exception) + InputChannel converted = channel.toInputChannel(); + assertThat(converted).isNotNull(); + } + + @Test + void testToInputChannelAllowedWhenStateConsumedAndConfigDisabled() throws IOException { + // When config is disabled, conversion requires both bufferFilteringCompleteFuture + // and stateConsumedFuture to be done + TestableRecoveredInputChannel channel = buildTestableChannel(false); + + // Initially, conversion should fail (buffer filtering not complete) + assertThatThrownBy(() -> channel.toInputChannel()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("buffer filtering is not complete"); + + // After finishReadRecoveredState(), bufferFilteringCompleteFuture is done + // but stateConsumedFuture is not + channel.finishReadRecoveredState(); + assertThat(channel.getBufferFilteringCompleteFuture()).isDone(); + assertThat(channel.getStateConsumedFuture()).isNotDone(); + + // Conversion should still fail because stateConsumedFuture is not done + assertThatThrownBy(() -> channel.toInputChannel()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("recovered state is not fully consumed"); + + // Consume the EndOfInputChannelStateEvent to complete stateConsumedFuture + assertThat(channel.getNextBuffer()).isNotPresent(); + assertThat(channel.getStateConsumedFuture()).isDone(); + + // Now conversion should succeed + InputChannel converted = channel.toInputChannel(); + assertThat(converted).isNotNull(); + } + + @Test + void testBufferFilteringCompleteFutureAlwaysCompletes() throws IOException { + // finishReadRecoveredState() unconditionally completes bufferFilteringCompleteFuture + for (boolean configEnabled : new boolean[] {true, false}) { + RecoveredInputChannel channel = buildChannel(configEnabled); + assertThat(channel.getBufferFilteringCompleteFuture()).isNotDone(); + channel.finishReadRecoveredState(); + assertThat(channel.getBufferFilteringCompleteFuture()).isDone(); + } + } + + @Test + void testStateConsumedFutureCompletesAfterConsumingAllBuffers() throws IOException { + // This test verifies that stateConsumedFuture completes after consuming + // EndOfInputChannelStateEvent regardless of the config setting + for (boolean configEnabled : new boolean[] {true, false}) { + RecoveredInputChannel channel = buildChannel(configEnabled); + + assertThat(channel.getStateConsumedFuture()).isNotDone(); + + channel.finishReadRecoveredState(); + assertThat(channel.getStateConsumedFuture()).isNotDone(); + + // Consuming the EndOfInputChannelStateEvent should complete the future. + // getNextBuffer() returns empty when it encounters the event internally. + assertThat(channel.getNextBuffer()).isNotPresent(); + assertThat(channel.getStateConsumedFuture()).isDone(); + } + } + + private RecoveredInputChannel buildChannel(boolean checkpointingDuringRecoveryEnabled) { try { + SingleInputGate inputGate = + new SingleInputGateBuilder() + .setCheckpointingDuringRecoveryEnabled( + checkpointingDuringRecoveryEnabled) + .build(); return new RecoveredInputChannel( - new SingleInputGateBuilder().build(), + inputGate, 0, new ResultPartitionID(), new ResultSubpartitionIndexSet(0), @@ -85,4 +177,41 @@ class RecoveredInputChannelTest { throw new AssertionError("channel creation failed", e); } } + + private TestableRecoveredInputChannel buildTestableChannel( + boolean checkpointingDuringRecoveryEnabled) { + try { + SingleInputGate inputGate = + new SingleInputGateBuilder() + .setCheckpointingDuringRecoveryEnabled( + checkpointingDuringRecoveryEnabled) + .build(); + return new TestableRecoveredInputChannel(inputGate); + } catch (Exception e) { + throw new AssertionError("channel creation failed", e); + } + } + + /** + * A RecoveredInputChannel that returns a TestInputChannel when converted, for testing purposes. + */ + private static class TestableRecoveredInputChannel extends RecoveredInputChannel { + TestableRecoveredInputChannel(SingleInputGate inputGate) { + super( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + 10); + } + + @Override + protected InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffers) { + return new TestInputChannel(inputGate, 0); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java index 618f31e518f..584aeb7eb90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java @@ -57,6 +57,11 @@ public class MockIndexedInputGate extends IndexedInputGate { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture<Void> getBufferFilteringCompleteFuture() { + return CompletableFuture.completedFuture(null); + } + @Override public void finishReadRecoveredState() {} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index a35c8995d2a..71b2c43f330 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -80,6 +80,11 @@ public class MockInputGate extends IndexedInputGate { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture<Void> getBufferFilteringCompleteFuture() { + return CompletableFuture.completedFuture(null); + } + @Override public void finishReadRecoveredState() {} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java index da3d2551934..619873c387d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java @@ -263,6 +263,11 @@ class AlignedCheckpointsMassiveRandomTest { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture<Void> getBufferFilteringCompleteFuture() { + return CompletableFuture.completedFuture(null); + } + @Override public void finishReadRecoveredState() {}
