This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4f192b3323891adb3b9b8dda7de62d14b901c570
Author: Rui Fan <[email protected]>
AuthorDate: Thu Mar 12 23:08:13 2026 +0100

    [FLINK-39018][checkpoint] Notify PriorityEvent to downstream task even if 
it is blocked to ensure the checkpoint barrier can be handled by downstream task
    
    Priority events (e.g. unaligned checkpoint barriers) must notify downstream 
even
    when the subpartition is blocked.
    
    During recovery, once the upstream output channel state is fully restored, a
    RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This 
event
    blocks the subpartition to prevent the upstream from sending new data while 
the
    downstream is still consuming recovered buffers. The subpartition remains 
blocked
    until the downstream finishes consuming all recovered buffers from every 
channel
    and calls resumeConsumption() to unblock.
    
    If a checkpoint is triggered while the downstream is still consuming 
recovered
    buffers, the upstream receives an unaligned checkpoint barrier and adds it 
to this
    blocked subpartition. The barrier must still be delivered to the downstream
    immediately, otherwise the checkpoint will hang until it times out.
---
 .../network/partition/PipelinedSubpartition.java   | 22 ++++++-
 .../PipelinedSubpartitionWithReadViewTest.java     | 74 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2381720106a..2e4b674a3a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -252,8 +252,21 @@ public class PipelinedSubpartition extends 
ResultSubpartition implements Channel
     @GuardedBy("buffers")
     private boolean needNotifyPriorityEvent() {
         assert Thread.holdsLock(buffers);
-        // if subpartition is blocked then downstream doesn't expect any 
notifications
-        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+        // Priority events (e.g. unaligned checkpoint barriers) must notify 
downstream even
+        // when the subpartition is blocked.
+        //
+        // During recovery, once the upstream output channel state is fully 
restored, a
+        // RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is 
emitted. This event
+        // blocks the subpartition to prevent the upstream from sending new 
data while the
+        // downstream is still consuming recovered buffers. The subpartition 
remains blocked
+        // until the downstream finishes consuming all recovered buffers from 
every channel
+        // and calls resumeConsumption() to unblock.
+        //
+        // If a checkpoint is triggered while the downstream is still 
consuming recovered
+        // buffers, the upstream receives an unaligned checkpoint barrier and 
adds it to this
+        // blocked subpartition. The barrier must still be delivered to the 
downstream
+        // immediately, otherwise the checkpoint will hang until it times out.
+        return buffers.getNumPriorityElements() == 1;
     }
 
     @GuardedBy("buffers")
@@ -456,7 +469,10 @@ public class PipelinedSubpartition extends 
ResultSubpartition implements Channel
     @Nullable
     BufferAndBacklog pollBuffer() {
         synchronized (buffers) {
-            if (isBlocked) {
+            // When blocked (e.g. by RECOVERY_COMPLETION event), only allow 
priority buffers
+            // (e.g. unaligned checkpoint barriers) to be polled. Regular 
buffers remain blocked
+            // until resumeConsumption() is called. See 
needNotifyPriorityEvent() for details.
+            if (isBlocked && buffers.getNumPriorityElements() == 0) {
                 return null;
             }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
index 1eebe25273a..1705e7f520d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -545,6 +545,80 @@ class PipelinedSubpartitionWithReadViewTest {
         assertNoNextBuffer(readView);
     }
 
+    @TestTemplate
+    void testPriorityEventBypassesBlockedSubpartition() throws Exception {
+        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
+
+        // Block the subpartition by consuming an aligned checkpoint barrier
+        blockSubpartitionByCheckpoint(1);
+        assertThat(availablityListener.getNumPriorityEvents()).isZero();
+
+        // While blocked, add an unaligned checkpoint barrier (priority event).
+        // Even though isBlocked=true, the priority event notification should 
NOT
+        // be suppressed — priority events must bypass blocking.
+        CheckpointOptions options =
+                CheckpointOptions.unaligned(
+                        CheckpointType.CHECKPOINT,
+                        new CheckpointStorageLocationReference(new byte[] {0, 
1, 2}));
+        BufferConsumer barrierBuffer =
+                EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, 
options), true);
+        subpartition.add(barrierBuffer);
+        // Priority notification should fire immediately despite isBlocked=true
+        assertThat(availablityListener.getNumPriorityEvents()).isOne();
+
+        assertNextEvent(
+                readView,
+                barrierBuffer.getWrittenBytes(),
+                CheckpointBarrier.class,
+                false,
+                0,
+                false,
+                true);
+        assertNoNextBuffer(readView);
+    }
+
+    @TestTemplate
+    void testDataStillBlockedAfterPriorityEventBypasses() throws Exception {
+        final RecordingChannelStateWriter channelStateWriter = new 
RecordingChannelStateWriter();
+        subpartition.setChannelStateWriter(channelStateWriter);
+
+        // Block the subpartition
+        blockSubpartitionByCheckpoint(1);
+        subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
+        assertNoNextBuffer(readView);
+
+        // Add priority event while blocked — should notify and be pollable
+        CheckpointOptions options =
+                CheckpointOptions.unaligned(
+                        CheckpointType.CHECKPOINT,
+                        new CheckpointStorageLocationReference(new byte[] {0, 
1, 2}));
+        channelStateWriter.start(0, options);
+        BufferConsumer barrierBuffer =
+                EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, 
options), true);
+        subpartition.add(barrierBuffer);
+        assertThat(availablityListener.getNumPriorityEvents()).isOne();
+
+        // Recycle inflight buffer copies held by channel state writer
+        final List<Buffer> inflight =
+                
channelStateWriter.getAddedOutput().get(subpartition.getSubpartitionInfo());
+        assertThat(inflight).hasSize(1);
+        inflight.forEach(Buffer::recycleBuffer);
+
+        assertNextEvent(
+                readView,
+                barrierBuffer.getWrittenBytes(),
+                CheckpointBarrier.class,
+                false,
+                0,
+                false,
+                true);
+        assertNoNextBuffer(readView);
+
+        // After resumeConsumption, data becomes available
+        readView.resumeConsumption();
+        assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
+    }
+
     // ------------------------------------------------------------------------
 
     private void blockSubpartitionByCheckpoint(int numNotifications)

Reply via email to