This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cebc174ad5f86716aba5b54017269e7263b6dec0 Author: Rui Fan <[email protected]> AuthorDate: Wed Feb 18 21:26:32 2026 +0100 [FLINK-39018][network] Fix LocalInputChannel priority event and buffer availability for recovered buffers --- .../partition/consumer/LocalInputChannel.java | 81 +++++++- .../partition/consumer/LocalInputChannelTest.java | 205 +++++++++++++++++++++ 2 files changed, 283 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 616071c450b..2833adecb58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -80,6 +80,13 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit private final Deque<BufferAndBacklog> toBeConsumedBuffers = new ArrayDeque<>(); + /** + * Flag indicating whether there is a pending priority event (e.g., checkpoint barrier) in the + * subpartitionView that should be consumed before toBeConsumedBuffers. This is set by {@link + * #notifyPriorityEvent} and checked in {@link #getNextBuffer()}. + */ + private volatile boolean hasPendingPriorityEvent = false; + public LocalInputChannel( SingleInputGate inputGate, int channelIndex, @@ -130,8 +137,6 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit @Override protected void requestSubpartitions() throws IOException { - checkState(toBeConsumedBuffers.isEmpty()); - boolean retriggerRequest = false; boolean notifyDataAvailable = false; @@ -242,7 +247,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit checkError(); if (!toBeConsumedBuffers.isEmpty()) { - return getBufferAndAvailability(toBeConsumedBuffers.removeFirst()); + return getNextRecoveredBuffer(); } ResultSubpartitionView subpartitionView = this.subpartitionView; @@ -304,6 +309,68 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit return getBufferAndAvailability(next); } + /** + * Consumes the next buffer from toBeConsumedBuffers (recovered buffers), handling pending + * priority events and dynamic availability detection for the last recovered buffer. + */ + private Optional<BufferAndAvailability> getNextRecoveredBuffer() throws IOException { + // If there is a pending priority event (e.g., unaligned checkpoint barrier), fetch it + // from subpartitionView first, skipping toBeConsumedBuffers. This ensures priority + // events are processed immediately even when there are pending recovered buffers. + if (hasPendingPriorityEvent) { + checkState(subpartitionView != null, "No subpartition view available"); + BufferAndBacklog next = subpartitionView.getNextBuffer(); + checkState( + next != null && next.buffer().getDataType().hasPriority(), + "Expected priority event, but got %s", + next == null ? "null" : next.buffer().getDataType()); + + // Check for barrier to update channel state persister. + // Note: maybePersist is not needed for barriers as they are not regular data buffers. + channelStatePersister.checkForBarrier(next.buffer()); + + Buffer.DataType expectedNextDataType = next.getNextDataType(); + if (!expectedNextDataType.hasPriority()) { + // Reset hasPendingPriorityEvent to false if no more priority event + hasPendingPriorityEvent = false; + if (!toBeConsumedBuffers.isEmpty()) { + // Correct nextDataType: if toBeConsumedBuffers is not empty, the actual next + // element to consume is from toBeConsumedBuffers, not from subpartitionView + expectedNextDataType = toBeConsumedBuffers.peek().buffer().getDataType(); + } + } + + return getBufferAndAvailability( + new BufferAndBacklog( + next.buffer(), + next.buffersInBacklog(), + expectedNextDataType, + next.getSequenceNumber())); + } + + BufferAndBacklog next = toBeConsumedBuffers.removeFirst(); + + // If this is the last recovered buffer and nextDataType is NONE, + // dynamically check if subpartitionView has data available. + // The last buffer's nextDataType was preset to NONE during construction, + // but subpartitionView may already have data available. + if (toBeConsumedBuffers.isEmpty() + && next.getNextDataType() == Buffer.DataType.NONE + && subpartitionView != null) { + ResultSubpartitionView.AvailabilityWithBacklog availability = + subpartitionView.getAvailabilityAndBacklog(true); + if (availability.isAvailable()) { + next = + new BufferAndBacklog( + next.buffer(), + availability.getBacklog(), + Buffer.DataType.DATA_BUFFER, + next.getSequenceNumber()); + } + } + return getBufferAndAvailability(next); + } + private Optional<BufferAndAvailability> getBufferAndAvailability(BufferAndBacklog next) throws IOException { Buffer buffer = next.buffer(); @@ -339,6 +406,14 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit notifyChannelNonEmpty(); } + @Override + public void notifyPriorityEvent(int prioritySequenceNumber) { + // Set flag so that getNextBuffer() knows to fetch priority event from subpartitionView + // before consuming toBeConsumedBuffers. + hasPendingPriorityEvent = true; + super.notifyPriorityEvent(prioritySequenceNumber); + } + private ResultSubpartitionView checkAndWaitForSubpartitionView() { // synchronizing on the request lock means this blocks until the asynchronous request // for the partition view has been completed diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 83a94e79cd1..aeb765f79b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -756,6 +756,211 @@ class LocalInputChannelTest { .containsExactly(10, 20, 30); } + @Test + void testPriorityEventConsumedBeforeRecoveredBuffers() throws Exception { + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + ChannelAndSubpartition ctx = createChannelWithRecoveredBuffers(stateWriter, 10, 20); + + // when: A priority event (barrier) arrives while recovered buffers are still pending + CheckpointOptions options = + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()); + CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, options); + ctx.subpartition.add(EventSerializer.toBufferConsumer(barrier, true)); + + ctx.channel.notifyPriorityEvent(0); + + // then: The first buffer returned should be the priority event (barrier), not recovered + // data + Optional<InputChannel.BufferAndAvailability> firstResult = ctx.channel.getNextBuffer(); + assertThat(firstResult).isPresent(); + assertThat(firstResult.get().buffer().getDataType().hasPriority()).isTrue(); + + // And the next buffers should be the recovered data + Optional<InputChannel.BufferAndAvailability> secondResult = ctx.channel.getNextBuffer(); + assertThat(secondResult).isPresent(); + assertThat(secondResult.get().buffer().isBuffer()).isTrue(); + assertThat(secondResult.get().buffer().getSize()).isEqualTo(10); + } + + @Test + void testPriorityEventFailsFastWhenSubpartitionViewIsNull() throws Exception { + // given: Local input channel with recovered buffers but NO subpartition view initialized + SingleInputGate inputGate = new SingleInputGateBuilder().build(); + + ArrayDeque<Buffer> recoveredBuffers = new ArrayDeque<>(); + recoveredBuffers.add(TestBufferFactory.createBuffer(10)); + + LocalInputChannel channel = + new LocalInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + new ResultPartitionManager(), + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + ChannelStateWriter.NO_OP, + recoveredBuffers); + + inputGate.setInputChannels(channel); + // Do NOT call channel.requestSubpartitions() — subpartitionView stays null + + channel.notifyPriorityEvent(0); + + assertThatThrownBy(channel::getNextBuffer) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No subpartition view available"); + } + + @Test + void testPriorityEventFailsFastWhenNonPriorityBufferReturned() throws Exception { + ChannelAndSubpartition ctx = + createChannelWithRecoveredBuffers(ChannelStateWriter.NO_OP, 10); + + // Add a non-priority data buffer to the subpartition + ctx.subpartition.add(createFilledFinishedBufferConsumer(32)); + ctx.channel.notifyPriorityEvent(0); + + assertThatThrownBy(ctx.channel::getNextBuffer) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Expected priority event"); + } + + @Test + void testPriorityEventFailsFastWhenSubpartitionViewReturnsNull() throws Exception { + ChannelAndSubpartition ctx = + createChannelWithRecoveredBuffers(ChannelStateWriter.NO_OP, 10); + + // Do NOT add any buffer to the subpartition — getNextBuffer() returns null + ctx.channel.notifyPriorityEvent(0); + + assertThatThrownBy(ctx.channel::getNextBuffer) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Expected priority event, but got null"); + } + + @Test + void testMultipleConsecutivePriorityEvents() throws Exception { + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + ChannelAndSubpartition ctx = createChannelWithRecoveredBuffers(stateWriter, 10); + + // Add two priority events (barriers) to the subpartition + CheckpointOptions options = + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()); + ctx.subpartition.add( + EventSerializer.toBufferConsumer(new CheckpointBarrier(1L, 0L, options), true)); + ctx.subpartition.add( + EventSerializer.toBufferConsumer(new CheckpointBarrier(2L, 0L, options), true)); + + ctx.channel.notifyPriorityEvent(0); + + // First getNextBuffer() should return the first barrier + Optional<InputChannel.BufferAndAvailability> first = ctx.channel.getNextBuffer(); + assertThat(first).isPresent(); + assertThat(first.get().buffer().getDataType().hasPriority()).isTrue(); + assertThat(first.get().morePriorityEvents()).isTrue(); + + // Second getNextBuffer() should return the second barrier + Optional<InputChannel.BufferAndAvailability> second = ctx.channel.getNextBuffer(); + assertThat(second).isPresent(); + assertThat(second.get().buffer().getDataType().hasPriority()).isTrue(); + + // Third getNextBuffer() should return the recovered data buffer + Optional<InputChannel.BufferAndAvailability> third = ctx.channel.getNextBuffer(); + assertThat(third).isPresent(); + assertThat(third.get().buffer().isBuffer()).isTrue(); + assertThat(third.get().buffer().getSize()).isEqualTo(10); + } + + @Test + void testNextDataTypeCorrectedToRecoveredBufferType() throws Exception { + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + ChannelAndSubpartition ctx = createChannelWithRecoveredBuffers(stateWriter, 10); + + // Add a priority event followed by a data buffer in the subpartition + CheckpointOptions options = + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()); + ctx.subpartition.add( + EventSerializer.toBufferConsumer(new CheckpointBarrier(1L, 0L, options), true)); + ctx.subpartition.add(createFilledFinishedBufferConsumer(32)); + + ctx.channel.notifyPriorityEvent(0); + + // getNextBuffer() returns the barrier + Optional<InputChannel.BufferAndAvailability> result = ctx.channel.getNextBuffer(); + assertThat(result).isPresent(); + assertThat(result.get().buffer().getDataType().hasPriority()).isTrue(); + + // The nextDataType should be corrected to DATA_BUFFER (from toBeConsumedBuffers), + // not whatever the subpartitionView reports. + assertThat(result.get().morePriorityEvents()).isFalse(); + assertThat(result.get().moreAvailable()).isTrue(); + + // The next buffer should be the recovered data (not the subpartition data) + Optional<InputChannel.BufferAndAvailability> next = ctx.channel.getNextBuffer(); + assertThat(next).isPresent(); + assertThat(next.get().buffer().isBuffer()).isTrue(); + assertThat(next.get().buffer().getSize()).isEqualTo(10); + } + + /** + * Creates a LocalInputChannel with recovered buffers and a live subpartition, ready for + * priority event tests. The channel has already called requestSubpartitions(). + */ + private static ChannelAndSubpartition createChannelWithRecoveredBuffers( + ChannelStateWriter stateWriter, int... recoveredBufferSizes) throws Exception { + SingleInputGate inputGate = new SingleInputGateBuilder().build(); + + PipelinedResultPartition parent = + (PipelinedResultPartition) + PartitionTestUtils.createPartition( + ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE); + ResultSubpartition subpartition = parent.getAllPartitions()[0]; + ResultSubpartitionView subpartitionView = + subpartition.createReadView((ResultSubpartitionView view) -> {}); + + TestingResultPartitionManager partitionManager = + new TestingResultPartitionManager(subpartitionView); + + ArrayDeque<Buffer> recoveredBuffers = new ArrayDeque<>(); + for (int size : recoveredBufferSizes) { + recoveredBuffers.add(TestBufferFactory.createBuffer(size)); + } + + LocalInputChannel channel = + new LocalInputChannel( + inputGate, + 0, + parent.getPartitionId(), + new ResultSubpartitionIndexSet(0), + partitionManager, + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + stateWriter, + recoveredBuffers); + + inputGate.setInputChannels(channel); + channel.requestSubpartitions(); + + return new ChannelAndSubpartition(channel, subpartition); + } + + private static class ChannelAndSubpartition { + final LocalInputChannel channel; + final ResultSubpartition subpartition; + + ChannelAndSubpartition(LocalInputChannel channel, ResultSubpartition subpartition) { + this.channel = channel; + this.subpartition = subpartition; + } + } + // --------------------------------------------------------------------------------------------- /** Returns the configured number of buffers for each channel in a random order. */
