This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1b933bd5a26c8f7a424e278e7af0bff66f3af2db Author: Rui Fan <[email protected]> AuthorDate: Fri Mar 27 14:40:18 2026 +0100 [FLINK-38930][checkpoint] Use heap buffer fallback to avoid deadlock during recovery When unaligned checkpointing during recovery is enabled, use a heap buffer as fallback instead of blocking on buffer pool, to avoid hanging if the buffer pool is not yet available. When the feature is disabled, the original blocking behavior is preserved. --- .../partition/consumer/IndexedInputGate.java | 6 +++++ .../partition/consumer/RecoveredInputChannel.java | 26 +++++++++++++++++++++- .../partition/consumer/SingleInputGate.java | 12 ++++++++++ .../runtime/taskmanager/InputGateWithMetrics.java | 10 +++++++++ .../flink/streaming/runtime/tasks/StreamTask.java | 6 +++++ .../streaming/runtime/io/MockIndexedInputGate.java | 8 +++++++ .../flink/streaming/runtime/io/MockInputGate.java | 8 +++++++ .../AlignedCheckpointsMassiveRandomTest.java | 8 +++++++ 8 files changed, 83 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java index 5daa277cd9b..915012924d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java @@ -74,4 +74,10 @@ public abstract class IndexedInputGate extends InputGate implements Checkpointab public abstract ResultPartitionType getConsumedPartitionType(); public abstract void triggerDebloating(); + + /** Sets whether unaligned checkpointing during recovery is enabled. */ + public abstract void setCheckpointingDuringRecoveryEnabled(boolean enabled); + + /** Returns whether unaligned checkpointing during recovery is enabled. */ + public abstract boolean isCheckpointingDuringRecoveryEnabled(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java index 1d31a65f49a..e809e952a28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; @@ -27,10 +29,13 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger; import org.apache.flink.runtime.io.network.partition.ChannelStateHolder; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; +import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -281,7 +286,26 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan bufferManager.requestExclusiveBuffers(networkBuffersPerChannel); exclusiveBuffersAssigned = true; } - return bufferManager.requestBufferBlocking(); + if (!inputGate.isCheckpointingDuringRecoveryEnabled()) { + // When checkpoint-during-recovery is not enabled, the original blocking allocation + // is used as-is — no heap buffer fallback, no behavior change from the legacy path. + return bufferManager.requestBufferBlocking(); + } + // Use heap buffer fallback to avoid deadlock during filtering recovery: the filtering + // thread first requests buffers to read state (pre-filter), then requests more buffers + // to write filtered output (post-filter). If pre-filter buffers exhaust the pool, + // post-filter allocation blocks, stalling the thread so pre-filter buffers can never + // be consumed and released — the thread deadlocks itself. Heap buffers bypass the pool + // so post-filter writes always proceed. Both call sites (getBuffer and filterAndRewrite) + // go through this method, so the fallback applies uniformly. + // TODO: replace heap fallback with disk spilling to bound memory usage in FLINK-38544. + Buffer buffer = bufferManager.requestBuffer(); + if (buffer != null) { + return buffer; + } + MemorySegment memorySegment = + MemorySegmentFactory.allocateUnpooledSegment(MemoryManager.DEFAULT_PAGE_SIZE); + return new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index a0bbb4e7b9e..d845eef6294 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -243,6 +243,8 @@ public class SingleInputGate extends IndexedInputGate { */ private final int[] endOfPartitions; + private volatile boolean checkpointingDuringRecoveryEnabled = false; + public SingleInputGate( String owningTaskName, int gateIndex, @@ -329,6 +331,16 @@ public class SingleInputGate extends IndexedInputGate { } } + @Override + public void setCheckpointingDuringRecoveryEnabled(boolean enabled) { + this.checkpointingDuringRecoveryEnabled = enabled; + } + + @Override + public boolean isCheckpointingDuringRecoveryEnabled() { + return checkpointingDuringRecoveryEnabled; + } + @Override public void requestPartitions() { synchronized (requestLock) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index 828d8e2ca3c..31775fb21de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -160,6 +160,16 @@ public class InputGateWithMetrics extends IndexedInputGate { inputGate.finishReadRecoveredState(); } + @Override + public void setCheckpointingDuringRecoveryEnabled(boolean enabled) { + inputGate.setCheckpointingDuringRecoveryEnabled(enabled); + } + + @Override + public boolean isCheckpointingDuringRecoveryEnabled() { + return inputGate.isCheckpointingDuringRecoveryEnabled(); + } + private BufferOrEvent updateMetrics(BufferOrEvent bufferOrEvent) { int incomingDataSize = bufferOrEvent.getSize(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 27823f7137f..9ec03137842 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -881,6 +881,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> INITIALIZE_STATE_DURATION, initializeStateEndTs - readOutputDataTs); IndexedInputGate[] inputGates = getEnvironment().getAllInputGates(); + boolean checkpointingDuringRecoveryEnabled = + CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration()); + for (IndexedInputGate inputGate : inputGates) { + inputGate.setCheckpointingDuringRecoveryEnabled(checkpointingDuringRecoveryEnabled); + } + channelIOExecutor.execute( () -> { try { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java index e0b2a3f8bfa..618f31e518f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java @@ -137,4 +137,12 @@ public class MockIndexedInputGate extends IndexedInputGate { @Override public void triggerDebloating() {} + + @Override + public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {} + + @Override + public boolean isCheckpointingDuringRecoveryEnabled() { + return false; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index fb9b3c88e07..a35c8995d2a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -199,4 +199,12 @@ public class MockInputGate extends IndexedInputGate { public List<InputChannelInfo> getUnfinishedChannels() { return Collections.emptyList(); } + + @Override + public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {} + + @Override + public boolean isCheckpointingDuringRecoveryEnabled() { + return false; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java index 16a30fe3cbb..da3d2551934 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java @@ -281,5 +281,13 @@ class AlignedCheckpointsMassiveRandomTest { public List<InputChannelInfo> getUnfinishedChannels() { return Collections.emptyList(); } + + @Override + public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {} + + @Override + public boolean isCheckpointingDuringRecoveryEnabled() { + return false; + } } }
