This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9ce47b215fc0ceec92d90c5c719de7d58befb28b Author: Rui Fan <[email protected]> AuthorDate: Wed Feb 18 21:26:24 2026 +0100 [hotfix][network] Fix LocalInputChannel.getBuffersInUseCount to include toBeConsumedBuffers --- .../partition/consumer/LocalInputChannel.java | 14 +++++-- .../partition/consumer/LocalInputChannelTest.java | 49 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 5e90ed4625a..a7628d8c52f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -402,6 +402,13 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit view.releaseAllResources(); subpartitionView = null; } + + // Release any remaining buffers in toBeConsumedBuffers to avoid memory leak. + // These may be recovered buffers or partial buffers from FullyFilledBuffer. + for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) { + bufferAndBacklog.buffer().recycleBuffer(); + } + toBeConsumedBuffers.clear(); } } @@ -418,18 +425,19 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit @Override int getBuffersInUseCount() { ResultSubpartitionView view = this.subpartitionView; - return view == null ? 0 : view.getNumberOfQueuedBuffers(); + return toBeConsumedBuffers.size() + (view == null ? 0 : view.getNumberOfQueuedBuffers()); } @Override public int unsynchronizedGetNumberOfQueuedBuffers() { ResultSubpartitionView view = subpartitionView; + int count = toBeConsumedBuffers.size(); if (view != null) { - return view.unsynchronizedGetNumberOfQueuedBuffers(); + count += view.unsynchronizedGetNumberOfQueuedBuffers(); } - return 0; + return count; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 298987c83f1..2e0be6ae7b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.disk.NoOpFileChannelManager; @@ -59,6 +61,7 @@ import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.ArrayDeque; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -664,6 +667,52 @@ class LocalInputChannelTest { assertThat(localChannel.getBuffersInUseCount()).isEqualTo(3); } + @Test + void testGetBuffersInUseCountIncludesToBeConsumedBuffers() throws Exception { + // given: Local input channel with recovered buffers in toBeConsumedBuffers + ResultSubpartitionView subpartitionView = + InputChannelTestUtils.createResultSubpartitionView( + createFilledFinishedBufferConsumer(4096), + createFilledFinishedBufferConsumer(4096)); + TestingResultPartitionManager partitionManager = + new TestingResultPartitionManager(subpartitionView); + final SingleInputGate inputGate = createSingleInputGate(1); + + // Create 3 recovered buffers + ArrayDeque<Buffer> recoveredBuffers = new ArrayDeque<>(); + recoveredBuffers.add(TestBufferFactory.createBuffer(32)); + recoveredBuffers.add(TestBufferFactory.createBuffer(32)); + recoveredBuffers.add(TestBufferFactory.createBuffer(32)); + + final LocalInputChannel localChannel = + new LocalInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + partitionManager, + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + ChannelStateWriter.NO_OP, + recoveredBuffers); + + inputGate.setInputChannels(localChannel); + + // then: Before requesting subpartitions, buffers in use should include recovered buffers + assertThat(localChannel.getBuffersInUseCount()).isEqualTo(3); + assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(3); + + // when: The subpartition view is initialized (adds 2 more buffers from the view) + localChannel.requestSubpartitions(); + + // then: Buffers in use should include both recovered and subpartition view buffers + assertThat(localChannel.getBuffersInUseCount()).isEqualTo(5); + assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(5); + } + // --------------------------------------------------------------------------------------------- /** Returns the configured number of buffers for each channel in a random order. */
