This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1b933bd5a26c8f7a424e278e7af0bff66f3af2db
Author: Rui Fan <[email protected]>
AuthorDate: Fri Mar 27 14:40:18 2026 +0100

    [FLINK-38930][checkpoint] Use heap buffer fallback to avoid deadlock during 
recovery
    
    When unaligned checkpointing during recovery is enabled, use a heap
    buffer as fallback instead of blocking on buffer pool, to avoid hanging
    if the buffer pool is not yet available. When the feature is disabled,
    the original blocking behavior is preserved.
---
 .../partition/consumer/IndexedInputGate.java       |  6 +++++
 .../partition/consumer/RecoveredInputChannel.java  | 26 +++++++++++++++++++++-
 .../partition/consumer/SingleInputGate.java        | 12 ++++++++++
 .../runtime/taskmanager/InputGateWithMetrics.java  | 10 +++++++++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  6 +++++
 .../streaming/runtime/io/MockIndexedInputGate.java |  8 +++++++
 .../flink/streaming/runtime/io/MockInputGate.java  |  8 +++++++
 .../AlignedCheckpointsMassiveRandomTest.java       |  8 +++++++
 8 files changed, 83 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index 5daa277cd9b..915012924d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -74,4 +74,10 @@ public abstract class IndexedInputGate extends InputGate 
implements Checkpointab
     public abstract ResultPartitionType getConsumedPartitionType();
 
     public abstract void triggerDebloating();
+
+    /** Sets whether unaligned checkpointing during recovery is enabled. */
+    public abstract void setCheckpointingDuringRecoveryEnabled(boolean 
enabled);
+
+    /** Returns whether unaligned checkpointing during recovery is enabled. */
+    public abstract boolean isCheckpointingDuringRecoveryEnabled();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 1d31a65f49a..e809e952a28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
@@ -27,10 +29,13 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -281,7 +286,26 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
             bufferManager.requestExclusiveBuffers(networkBuffersPerChannel);
             exclusiveBuffersAssigned = true;
         }
-        return bufferManager.requestBufferBlocking();
+        if (!inputGate.isCheckpointingDuringRecoveryEnabled()) {
+            // When checkpoint-during-recovery is not enabled, the original 
blocking allocation
+            // is used as-is — no heap buffer fallback, no behavior change 
from the legacy path.
+            return bufferManager.requestBufferBlocking();
+        }
+        // Use heap buffer fallback to avoid deadlock during filtering 
recovery: the filtering
+        // thread first requests buffers to read state (pre-filter), then 
requests more buffers
+        // to write filtered output (post-filter). If pre-filter buffers 
exhaust the pool,
+        // post-filter allocation blocks, stalling the thread so pre-filter 
buffers can never
+        // be consumed and released — the thread deadlocks itself. Heap 
buffers bypass the pool
+        // so post-filter writes always proceed. Both call sites (getBuffer 
and filterAndRewrite)
+        // go through this method, so the fallback applies uniformly.
+        // TODO: replace heap fallback with disk spilling to bound memory 
usage in FLINK-38544.
+        Buffer buffer = bufferManager.requestBuffer();
+        if (buffer != null) {
+            return buffer;
+        }
+        MemorySegment memorySegment =
+                
MemorySegmentFactory.allocateUnpooledSegment(MemoryManager.DEFAULT_PAGE_SIZE);
+        return new NetworkBuffer(memorySegment, 
FreeingBufferRecycler.INSTANCE);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index a0bbb4e7b9e..d845eef6294 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -243,6 +243,8 @@ public class SingleInputGate extends IndexedInputGate {
      */
     private final int[] endOfPartitions;
 
+    private volatile boolean checkpointingDuringRecoveryEnabled = false;
+
     public SingleInputGate(
             String owningTaskName,
             int gateIndex,
@@ -329,6 +331,16 @@ public class SingleInputGate extends IndexedInputGate {
         }
     }
 
+    @Override
+    public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {
+        this.checkpointingDuringRecoveryEnabled = enabled;
+    }
+
+    @Override
+    public boolean isCheckpointingDuringRecoveryEnabled() {
+        return checkpointingDuringRecoveryEnabled;
+    }
+
     @Override
     public void requestPartitions() {
         synchronized (requestLock) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 828d8e2ca3c..31775fb21de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -160,6 +160,16 @@ public class InputGateWithMetrics extends IndexedInputGate 
{
         inputGate.finishReadRecoveredState();
     }
 
+    @Override
+    public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {
+        inputGate.setCheckpointingDuringRecoveryEnabled(enabled);
+    }
+
+    @Override
+    public boolean isCheckpointingDuringRecoveryEnabled() {
+        return inputGate.isCheckpointingDuringRecoveryEnabled();
+    }
+
     private BufferOrEvent updateMetrics(BufferOrEvent bufferOrEvent) {
         int incomingDataSize = bufferOrEvent.getSize();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 27823f7137f..9ec03137842 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -881,6 +881,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                 INITIALIZE_STATE_DURATION, initializeStateEndTs - 
readOutputDataTs);
         IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
 
+        boolean checkpointingDuringRecoveryEnabled =
+                
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration());
+        for (IndexedInputGate inputGate : inputGates) {
+            
inputGate.setCheckpointingDuringRecoveryEnabled(checkpointingDuringRecoveryEnabled);
+        }
+
         channelIOExecutor.execute(
                 () -> {
                     try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index e0b2a3f8bfa..618f31e518f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -137,4 +137,12 @@ public class MockIndexedInputGate extends IndexedInputGate 
{
 
     @Override
     public void triggerDebloating() {}
+
+    @Override
+    public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {}
+
+    @Override
+    public boolean isCheckpointingDuringRecoveryEnabled() {
+        return false;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index fb9b3c88e07..a35c8995d2a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -199,4 +199,12 @@ public class MockInputGate extends IndexedInputGate {
     public List<InputChannelInfo> getUnfinishedChannels() {
         return Collections.emptyList();
     }
+
+    @Override
+    public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {}
+
+    @Override
+    public boolean isCheckpointingDuringRecoveryEnabled() {
+        return false;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
index 16a30fe3cbb..da3d2551934 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
@@ -281,5 +281,13 @@ class AlignedCheckpointsMassiveRandomTest {
         public List<InputChannelInfo> getUnfinishedChannels() {
             return Collections.emptyList();
         }
+
+        @Override
+        public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {}
+
+        @Override
+        public boolean isCheckpointingDuringRecoveryEnabled() {
+            return false;
+        }
     }
 }

Reply via email to