This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4f192b3323891adb3b9b8dda7de62d14b901c570 Author: Rui Fan <[email protected]> AuthorDate: Thu Mar 12 23:08:13 2026 +0100 [FLINK-39018][checkpoint] Notify PriorityEvent to downstream task even if it is blocked to ensure the checkpoint barrier can be handled by downstream task Priority events (e.g. unaligned checkpoint barriers) must notify downstream even when the subpartition is blocked. During recovery, once the upstream output channel state is fully restored, a RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event blocks the subpartition to prevent the upstream from sending new data while the downstream is still consuming recovered buffers. The subpartition remains blocked until the downstream finishes consuming all recovered buffers from every channel and calls resumeConsumption() to unblock. If a checkpoint is triggered while the downstream is still consuming recovered buffers, the upstream receives an unaligned checkpoint barrier and adds it to this blocked subpartition. The barrier must still be delivered to the downstream immediately, otherwise the checkpoint will hang until it times out. --- .../network/partition/PipelinedSubpartition.java | 22 ++++++- .../PipelinedSubpartitionWithReadViewTest.java | 74 ++++++++++++++++++++++ 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 2381720106a..2e4b674a3a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -252,8 +252,21 @@ public class PipelinedSubpartition extends ResultSubpartition implements Channel @GuardedBy("buffers") private boolean needNotifyPriorityEvent() { assert Thread.holdsLock(buffers); - // if subpartition is blocked then downstream doesn't expect any notifications - return buffers.getNumPriorityElements() == 1 && !isBlocked; + // Priority events (e.g. unaligned checkpoint barriers) must notify downstream even + // when the subpartition is blocked. + // + // During recovery, once the upstream output channel state is fully restored, a + // RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event + // blocks the subpartition to prevent the upstream from sending new data while the + // downstream is still consuming recovered buffers. The subpartition remains blocked + // until the downstream finishes consuming all recovered buffers from every channel + // and calls resumeConsumption() to unblock. + // + // If a checkpoint is triggered while the downstream is still consuming recovered + // buffers, the upstream receives an unaligned checkpoint barrier and adds it to this + // blocked subpartition. The barrier must still be delivered to the downstream + // immediately, otherwise the checkpoint will hang until it times out. + return buffers.getNumPriorityElements() == 1; } @GuardedBy("buffers") @@ -456,7 +469,10 @@ public class PipelinedSubpartition extends ResultSubpartition implements Channel @Nullable BufferAndBacklog pollBuffer() { synchronized (buffers) { - if (isBlocked) { + // When blocked (e.g. by RECOVERY_COMPLETION event), only allow priority buffers + // (e.g. unaligned checkpoint barriers) to be polled. Regular buffers remain blocked + // until resumeConsumption() is called. See needNotifyPriorityEvent() for details. + if (isBlocked && buffers.getNumPriorityElements() == 0) { return null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java index 1eebe25273a..1705e7f520d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java @@ -545,6 +545,80 @@ class PipelinedSubpartitionWithReadViewTest { assertNoNextBuffer(readView); } + @TestTemplate + void testPriorityEventBypassesBlockedSubpartition() throws Exception { + subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); + + // Block the subpartition by consuming an aligned checkpoint barrier + blockSubpartitionByCheckpoint(1); + assertThat(availablityListener.getNumPriorityEvents()).isZero(); + + // While blocked, add an unaligned checkpoint barrier (priority event). + // Even though isBlocked=true, the priority event notification should NOT + // be suppressed — priority events must bypass blocking. + CheckpointOptions options = + CheckpointOptions.unaligned( + CheckpointType.CHECKPOINT, + new CheckpointStorageLocationReference(new byte[] {0, 1, 2})); + BufferConsumer barrierBuffer = + EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true); + subpartition.add(barrierBuffer); + // Priority notification should fire immediately despite isBlocked=true + assertThat(availablityListener.getNumPriorityEvents()).isOne(); + + assertNextEvent( + readView, + barrierBuffer.getWrittenBytes(), + CheckpointBarrier.class, + false, + 0, + false, + true); + assertNoNextBuffer(readView); + } + + @TestTemplate + void testDataStillBlockedAfterPriorityEventBypasses() throws Exception { + final RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter(); + subpartition.setChannelStateWriter(channelStateWriter); + + // Block the subpartition + blockSubpartitionByCheckpoint(1); + subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); + assertNoNextBuffer(readView); + + // Add priority event while blocked — should notify and be pollable + CheckpointOptions options = + CheckpointOptions.unaligned( + CheckpointType.CHECKPOINT, + new CheckpointStorageLocationReference(new byte[] {0, 1, 2})); + channelStateWriter.start(0, options); + BufferConsumer barrierBuffer = + EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true); + subpartition.add(barrierBuffer); + assertThat(availablityListener.getNumPriorityEvents()).isOne(); + + // Recycle inflight buffer copies held by channel state writer + final List<Buffer> inflight = + channelStateWriter.getAddedOutput().get(subpartition.getSubpartitionInfo()); + assertThat(inflight).hasSize(1); + inflight.forEach(Buffer::recycleBuffer); + + assertNextEvent( + readView, + barrierBuffer.getWrittenBytes(), + CheckpointBarrier.class, + false, + 0, + false, + true); + assertNoNextBuffer(readView); + + // After resumeConsumption, data becomes available + readView.resumeConsumption(); + assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true); + } + // ------------------------------------------------------------------------ private void blockSubpartitionByCheckpoint(int numNotifications)
