This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cebc174ad5f86716aba5b54017269e7263b6dec0
Author: Rui Fan <[email protected]>
AuthorDate: Wed Feb 18 21:26:32 2026 +0100

    [FLINK-39018][network] Fix LocalInputChannel priority event and buffer 
availability for recovered buffers
---
 .../partition/consumer/LocalInputChannel.java      |  81 +++++++-
 .../partition/consumer/LocalInputChannelTest.java  | 205 +++++++++++++++++++++
 2 files changed, 283 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 616071c450b..2833adecb58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -80,6 +80,13 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
     private final Deque<BufferAndBacklog> toBeConsumedBuffers = new 
ArrayDeque<>();
 
+    /**
+     * Flag indicating whether there is a pending priority event (e.g., 
checkpoint barrier) in the
+     * subpartitionView that should be consumed before toBeConsumedBuffers. 
This is set by {@link
+     * #notifyPriorityEvent} and checked in {@link #getNextBuffer()}.
+     */
+    private volatile boolean hasPendingPriorityEvent = false;
+
     public LocalInputChannel(
             SingleInputGate inputGate,
             int channelIndex,
@@ -130,8 +137,6 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
     @Override
     protected void requestSubpartitions() throws IOException {
-        checkState(toBeConsumedBuffers.isEmpty());
-
         boolean retriggerRequest = false;
         boolean notifyDataAvailable = false;
 
@@ -242,7 +247,7 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
         checkError();
 
         if (!toBeConsumedBuffers.isEmpty()) {
-            return getBufferAndAvailability(toBeConsumedBuffers.removeFirst());
+            return getNextRecoveredBuffer();
         }
 
         ResultSubpartitionView subpartitionView = this.subpartitionView;
@@ -304,6 +309,68 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
         return getBufferAndAvailability(next);
     }
 
+    /**
+     * Consumes the next buffer from toBeConsumedBuffers (recovered buffers), 
handling pending
+     * priority events and dynamic availability detection for the last 
recovered buffer.
+     */
+    private Optional<BufferAndAvailability> getNextRecoveredBuffer() throws 
IOException {
+        // If there is a pending priority event (e.g., unaligned checkpoint 
barrier), fetch it
+        // from subpartitionView first, skipping toBeConsumedBuffers. This 
ensures priority
+        // events are processed immediately even when there are pending 
recovered buffers.
+        if (hasPendingPriorityEvent) {
+            checkState(subpartitionView != null, "No subpartition view 
available");
+            BufferAndBacklog next = subpartitionView.getNextBuffer();
+            checkState(
+                    next != null && next.buffer().getDataType().hasPriority(),
+                    "Expected priority event, but got %s",
+                    next == null ? "null" : next.buffer().getDataType());
+
+            // Check for barrier to update channel state persister.
+            // Note: maybePersist is not needed for barriers as they are not 
regular data buffers.
+            channelStatePersister.checkForBarrier(next.buffer());
+
+            Buffer.DataType expectedNextDataType = next.getNextDataType();
+            if (!expectedNextDataType.hasPriority()) {
+                // Reset hasPendingPriorityEvent to false if no more priority 
event
+                hasPendingPriorityEvent = false;
+                if (!toBeConsumedBuffers.isEmpty()) {
+                    // Correct nextDataType: if toBeConsumedBuffers is not 
empty, the actual next
+                    // element to consume is from toBeConsumedBuffers, not 
from subpartitionView
+                    expectedNextDataType = 
toBeConsumedBuffers.peek().buffer().getDataType();
+                }
+            }
+
+            return getBufferAndAvailability(
+                    new BufferAndBacklog(
+                            next.buffer(),
+                            next.buffersInBacklog(),
+                            expectedNextDataType,
+                            next.getSequenceNumber()));
+        }
+
+        BufferAndBacklog next = toBeConsumedBuffers.removeFirst();
+
+        // If this is the last recovered buffer and nextDataType is NONE,
+        // dynamically check if subpartitionView has data available.
+        // The last buffer's nextDataType was preset to NONE during 
construction,
+        // but subpartitionView may already have data available.
+        if (toBeConsumedBuffers.isEmpty()
+                && next.getNextDataType() == Buffer.DataType.NONE
+                && subpartitionView != null) {
+            ResultSubpartitionView.AvailabilityWithBacklog availability =
+                    subpartitionView.getAvailabilityAndBacklog(true);
+            if (availability.isAvailable()) {
+                next =
+                        new BufferAndBacklog(
+                                next.buffer(),
+                                availability.getBacklog(),
+                                Buffer.DataType.DATA_BUFFER,
+                                next.getSequenceNumber());
+            }
+        }
+        return getBufferAndAvailability(next);
+    }
+
     private Optional<BufferAndAvailability> 
getBufferAndAvailability(BufferAndBacklog next)
             throws IOException {
         Buffer buffer = next.buffer();
@@ -339,6 +406,14 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
         notifyChannelNonEmpty();
     }
 
+    @Override
+    public void notifyPriorityEvent(int prioritySequenceNumber) {
+        // Set flag so that getNextBuffer() knows to fetch priority event from 
subpartitionView
+        // before consuming toBeConsumedBuffers.
+        hasPendingPriorityEvent = true;
+        super.notifyPriorityEvent(prioritySequenceNumber);
+    }
+
     private ResultSubpartitionView checkAndWaitForSubpartitionView() {
         // synchronizing on the request lock means this blocks until the 
asynchronous request
         // for the partition view has been completed
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 83a94e79cd1..aeb765f79b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -756,6 +756,211 @@ class LocalInputChannelTest {
                 .containsExactly(10, 20, 30);
     }
 
+    @Test
+    void testPriorityEventConsumedBeforeRecoveredBuffers() throws Exception {
+        RecordingChannelStateWriter stateWriter = new 
RecordingChannelStateWriter();
+        ChannelAndSubpartition ctx = 
createChannelWithRecoveredBuffers(stateWriter, 10, 20);
+
+        // when: A priority event (barrier) arrives while recovered buffers 
are still pending
+        CheckpointOptions options =
+                CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
getDefault());
+        CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, options);
+        ctx.subpartition.add(EventSerializer.toBufferConsumer(barrier, true));
+
+        ctx.channel.notifyPriorityEvent(0);
+
+        // then: The first buffer returned should be the priority event 
(barrier), not recovered
+        // data
+        Optional<InputChannel.BufferAndAvailability> firstResult = 
ctx.channel.getNextBuffer();
+        assertThat(firstResult).isPresent();
+        
assertThat(firstResult.get().buffer().getDataType().hasPriority()).isTrue();
+
+        // And the next buffers should be the recovered data
+        Optional<InputChannel.BufferAndAvailability> secondResult = 
ctx.channel.getNextBuffer();
+        assertThat(secondResult).isPresent();
+        assertThat(secondResult.get().buffer().isBuffer()).isTrue();
+        assertThat(secondResult.get().buffer().getSize()).isEqualTo(10);
+    }
+
+    @Test
+    void testPriorityEventFailsFastWhenSubpartitionViewIsNull() throws 
Exception {
+        // given: Local input channel with recovered buffers but NO 
subpartition view initialized
+        SingleInputGate inputGate = new SingleInputGateBuilder().build();
+
+        ArrayDeque<Buffer> recoveredBuffers = new ArrayDeque<>();
+        recoveredBuffers.add(TestBufferFactory.createBuffer(10));
+
+        LocalInputChannel channel =
+                new LocalInputChannel(
+                        inputGate,
+                        0,
+                        new ResultPartitionID(),
+                        new ResultSubpartitionIndexSet(0),
+                        new ResultPartitionManager(),
+                        new TaskEventDispatcher(),
+                        0,
+                        0,
+                        new SimpleCounter(),
+                        new SimpleCounter(),
+                        ChannelStateWriter.NO_OP,
+                        recoveredBuffers);
+
+        inputGate.setInputChannels(channel);
+        // Do NOT call channel.requestSubpartitions() — subpartitionView stays 
null
+
+        channel.notifyPriorityEvent(0);
+
+        assertThatThrownBy(channel::getNextBuffer)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("No subpartition view available");
+    }
+
+    @Test
+    void testPriorityEventFailsFastWhenNonPriorityBufferReturned() throws 
Exception {
+        ChannelAndSubpartition ctx =
+                createChannelWithRecoveredBuffers(ChannelStateWriter.NO_OP, 
10);
+
+        // Add a non-priority data buffer to the subpartition
+        ctx.subpartition.add(createFilledFinishedBufferConsumer(32));
+        ctx.channel.notifyPriorityEvent(0);
+
+        assertThatThrownBy(ctx.channel::getNextBuffer)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Expected priority event");
+    }
+
+    @Test
+    void testPriorityEventFailsFastWhenSubpartitionViewReturnsNull() throws 
Exception {
+        ChannelAndSubpartition ctx =
+                createChannelWithRecoveredBuffers(ChannelStateWriter.NO_OP, 
10);
+
+        // Do NOT add any buffer to the subpartition — getNextBuffer() returns 
null
+        ctx.channel.notifyPriorityEvent(0);
+
+        assertThatThrownBy(ctx.channel::getNextBuffer)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Expected priority event, but got null");
+    }
+
+    @Test
+    void testMultipleConsecutivePriorityEvents() throws Exception {
+        RecordingChannelStateWriter stateWriter = new 
RecordingChannelStateWriter();
+        ChannelAndSubpartition ctx = 
createChannelWithRecoveredBuffers(stateWriter, 10);
+
+        // Add two priority events (barriers) to the subpartition
+        CheckpointOptions options =
+                CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
getDefault());
+        ctx.subpartition.add(
+                EventSerializer.toBufferConsumer(new CheckpointBarrier(1L, 0L, 
options), true));
+        ctx.subpartition.add(
+                EventSerializer.toBufferConsumer(new CheckpointBarrier(2L, 0L, 
options), true));
+
+        ctx.channel.notifyPriorityEvent(0);
+
+        // First getNextBuffer() should return the first barrier
+        Optional<InputChannel.BufferAndAvailability> first = 
ctx.channel.getNextBuffer();
+        assertThat(first).isPresent();
+        assertThat(first.get().buffer().getDataType().hasPriority()).isTrue();
+        assertThat(first.get().morePriorityEvents()).isTrue();
+
+        // Second getNextBuffer() should return the second barrier
+        Optional<InputChannel.BufferAndAvailability> second = 
ctx.channel.getNextBuffer();
+        assertThat(second).isPresent();
+        assertThat(second.get().buffer().getDataType().hasPriority()).isTrue();
+
+        // Third getNextBuffer() should return the recovered data buffer
+        Optional<InputChannel.BufferAndAvailability> third = 
ctx.channel.getNextBuffer();
+        assertThat(third).isPresent();
+        assertThat(third.get().buffer().isBuffer()).isTrue();
+        assertThat(third.get().buffer().getSize()).isEqualTo(10);
+    }
+
+    @Test
+    void testNextDataTypeCorrectedToRecoveredBufferType() throws Exception {
+        RecordingChannelStateWriter stateWriter = new 
RecordingChannelStateWriter();
+        ChannelAndSubpartition ctx = 
createChannelWithRecoveredBuffers(stateWriter, 10);
+
+        // Add a priority event followed by a data buffer in the subpartition
+        CheckpointOptions options =
+                CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
getDefault());
+        ctx.subpartition.add(
+                EventSerializer.toBufferConsumer(new CheckpointBarrier(1L, 0L, 
options), true));
+        ctx.subpartition.add(createFilledFinishedBufferConsumer(32));
+
+        ctx.channel.notifyPriorityEvent(0);
+
+        // getNextBuffer() returns the barrier
+        Optional<InputChannel.BufferAndAvailability> result = 
ctx.channel.getNextBuffer();
+        assertThat(result).isPresent();
+        assertThat(result.get().buffer().getDataType().hasPriority()).isTrue();
+
+        // The nextDataType should be corrected to DATA_BUFFER (from 
toBeConsumedBuffers),
+        // not whatever the subpartitionView reports.
+        assertThat(result.get().morePriorityEvents()).isFalse();
+        assertThat(result.get().moreAvailable()).isTrue();
+
+        // The next buffer should be the recovered data (not the subpartition 
data)
+        Optional<InputChannel.BufferAndAvailability> next = 
ctx.channel.getNextBuffer();
+        assertThat(next).isPresent();
+        assertThat(next.get().buffer().isBuffer()).isTrue();
+        assertThat(next.get().buffer().getSize()).isEqualTo(10);
+    }
+
+    /**
+     * Creates a LocalInputChannel with recovered buffers and a live 
subpartition, ready for
+     * priority event tests. The channel has already called 
requestSubpartitions().
+     */
+    private static ChannelAndSubpartition createChannelWithRecoveredBuffers(
+            ChannelStateWriter stateWriter, int... recoveredBufferSizes) 
throws Exception {
+        SingleInputGate inputGate = new SingleInputGateBuilder().build();
+
+        PipelinedResultPartition parent =
+                (PipelinedResultPartition)
+                        PartitionTestUtils.createPartition(
+                                ResultPartitionType.PIPELINED, 
NoOpFileChannelManager.INSTANCE);
+        ResultSubpartition subpartition = parent.getAllPartitions()[0];
+        ResultSubpartitionView subpartitionView =
+                subpartition.createReadView((ResultSubpartitionView view) -> 
{});
+
+        TestingResultPartitionManager partitionManager =
+                new TestingResultPartitionManager(subpartitionView);
+
+        ArrayDeque<Buffer> recoveredBuffers = new ArrayDeque<>();
+        for (int size : recoveredBufferSizes) {
+            recoveredBuffers.add(TestBufferFactory.createBuffer(size));
+        }
+
+        LocalInputChannel channel =
+                new LocalInputChannel(
+                        inputGate,
+                        0,
+                        parent.getPartitionId(),
+                        new ResultSubpartitionIndexSet(0),
+                        partitionManager,
+                        new TaskEventDispatcher(),
+                        0,
+                        0,
+                        new SimpleCounter(),
+                        new SimpleCounter(),
+                        stateWriter,
+                        recoveredBuffers);
+
+        inputGate.setInputChannels(channel);
+        channel.requestSubpartitions();
+
+        return new ChannelAndSubpartition(channel, subpartition);
+    }
+
+    private static class ChannelAndSubpartition {
+        final LocalInputChannel channel;
+        final ResultSubpartition subpartition;
+
+        ChannelAndSubpartition(LocalInputChannel channel, ResultSubpartition 
subpartition) {
+            this.channel = channel;
+            this.subpartition = subpartition;
+        }
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     /** Returns the configured number of buffers for each channel in a random 
order. */

Reply via email to