This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3aef0932ded1a1ece9943915c681e1fe33433700
Author: Rui Fan <[email protected]>
AuthorDate: Wed Feb 18 21:26:32 2026 +0100

    [FLINK-39018][checkpoint] Support LocalInputChannel checkpoint snapshot for 
recovered buffers
---
 .../partition/consumer/LocalInputChannel.java      | 12 +++++-
 .../partition/consumer/LocalInputChannelTest.java  | 43 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index a7628d8c52f..616071c450b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -46,7 +46,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.Deque;
 import java.util.List;
 import java.util.Optional;
@@ -113,7 +113,15 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
     // ------------------------------------------------------------------------
 
     public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
-        channelStatePersister.startPersisting(barrier.getId(), 
Collections.emptyList());
+        // Collect inflight buffers from toBeConsumedBuffers to be persisted.
+        // These are buffers that have not been consumed yet when the 
checkpoint barrier arrives.
+        List<Buffer> inflightBuffers = new ArrayList<>();
+        for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) {
+            if (bufferAndBacklog.buffer().isBuffer()) {
+                inflightBuffers.add(bufferAndBacklog.buffer().retainBuffer());
+            }
+        }
+        channelStatePersister.startPersisting(barrier.getId(), 
inflightBuffers);
     }
 
     public void checkpointStopped(long checkpointId) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 2e0be6ae7b8..83a94e79cd1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -713,6 +713,49 @@ class LocalInputChannelTest {
         
assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(5);
     }
 
+    @Test
+    void testCheckpointStartedPersistsRecoveredBuffers() throws Exception {
+        // given: Local input channel with recovered buffers
+        SingleInputGate inputGate = new SingleInputGateBuilder().build();
+
+        ArrayDeque<Buffer> recoveredBuffers = new ArrayDeque<>();
+        recoveredBuffers.add(TestBufferFactory.createBuffer(10));
+        recoveredBuffers.add(TestBufferFactory.createBuffer(20));
+        recoveredBuffers.add(TestBufferFactory.createBuffer(30));
+
+        RecordingChannelStateWriter stateWriter = new 
RecordingChannelStateWriter();
+
+        LocalInputChannel channel =
+                new LocalInputChannel(
+                        inputGate,
+                        0,
+                        new ResultPartitionID(),
+                        new ResultSubpartitionIndexSet(0),
+                        new ResultPartitionManager(),
+                        new TaskEventDispatcher(),
+                        0,
+                        0,
+                        new SimpleCounter(),
+                        new SimpleCounter(),
+                        stateWriter,
+                        recoveredBuffers);
+
+        inputGate.setInputChannels(channel);
+
+        // when: Checkpoint is started
+        CheckpointOptions options =
+                CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
getDefault());
+        stateWriter.start(1L, options);
+        CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, options);
+        channel.checkpointStarted(barrier);
+
+        // then: All 3 recovered buffers should be persisted as inflight data
+        List<Buffer> persistedBuffers = 
stateWriter.getAddedInput().get(channel.getChannelInfo());
+        assertThat(persistedBuffers).isNotNull().hasSize(3);
+        
assertThat(persistedBuffers.stream().mapToInt(Buffer::getSize).toArray())
+                .containsExactly(10, 20, 30);
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     /** Returns the configured number of buffers for each channel in a random 
order. */

Reply via email to