This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3aef0932ded1a1ece9943915c681e1fe33433700 Author: Rui Fan <[email protected]> AuthorDate: Wed Feb 18 21:26:32 2026 +0100 [FLINK-39018][checkpoint] Support LocalInputChannel checkpoint snapshot for recovered buffers --- .../partition/consumer/LocalInputChannel.java | 12 +++++- .../partition/consumer/LocalInputChannelTest.java | 43 ++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index a7628d8c52f..616071c450b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -46,7 +46,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayDeque; -import java.util.Collections; +import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Optional; @@ -113,7 +113,15 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit // ------------------------------------------------------------------------ public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException { - channelStatePersister.startPersisting(barrier.getId(), Collections.emptyList()); + // Collect inflight buffers from toBeConsumedBuffers to be persisted. + // These are buffers that have not been consumed yet when the checkpoint barrier arrives. + List<Buffer> inflightBuffers = new ArrayList<>(); + for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) { + if (bufferAndBacklog.buffer().isBuffer()) { + inflightBuffers.add(bufferAndBacklog.buffer().retainBuffer()); + } + } + channelStatePersister.startPersisting(barrier.getId(), inflightBuffers); } public void checkpointStopped(long checkpointId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 2e0be6ae7b8..83a94e79cd1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -713,6 +713,49 @@ class LocalInputChannelTest { assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(5); } + @Test + void testCheckpointStartedPersistsRecoveredBuffers() throws Exception { + // given: Local input channel with recovered buffers + SingleInputGate inputGate = new SingleInputGateBuilder().build(); + + ArrayDeque<Buffer> recoveredBuffers = new ArrayDeque<>(); + recoveredBuffers.add(TestBufferFactory.createBuffer(10)); + recoveredBuffers.add(TestBufferFactory.createBuffer(20)); + recoveredBuffers.add(TestBufferFactory.createBuffer(30)); + + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + + LocalInputChannel channel = + new LocalInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + new ResultPartitionManager(), + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + stateWriter, + recoveredBuffers); + + inputGate.setInputChannels(channel); + + // when: Checkpoint is started + CheckpointOptions options = + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()); + stateWriter.start(1L, options); + CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, options); + channel.checkpointStarted(barrier); + + // then: All 3 recovered buffers should be persisted as inflight data + List<Buffer> persistedBuffers = stateWriter.getAddedInput().get(channel.getChannelInfo()); + assertThat(persistedBuffers).isNotNull().hasSize(3); + assertThat(persistedBuffers.stream().mapToInt(Buffer::getSize).toArray()) + .containsExactly(10, 20, 30); + } + // --------------------------------------------------------------------------------------------- /** Returns the configured number of buffers for each channel in a random order. */
