This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ce47b215fc0ceec92d90c5c719de7d58befb28b
Author: Rui Fan <[email protected]>
AuthorDate: Wed Feb 18 21:26:24 2026 +0100

    [hotfix][network] Fix LocalInputChannel.getBuffersInUseCount to include 
toBeConsumedBuffers
---
 .../partition/consumer/LocalInputChannel.java      | 14 +++++--
 .../partition/consumer/LocalInputChannelTest.java  | 49 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 5e90ed4625a..a7628d8c52f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -402,6 +402,13 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                 view.releaseAllResources();
                 subpartitionView = null;
             }
+
+            // Release any remaining buffers in toBeConsumedBuffers to avoid 
memory leak.
+            // These may be recovered buffers or partial buffers from 
FullyFilledBuffer.
+            for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) {
+                bufferAndBacklog.buffer().recycleBuffer();
+            }
+            toBeConsumedBuffers.clear();
         }
     }
 
@@ -418,18 +425,19 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
     @Override
     int getBuffersInUseCount() {
         ResultSubpartitionView view = this.subpartitionView;
-        return view == null ? 0 : view.getNumberOfQueuedBuffers();
+        return toBeConsumedBuffers.size() + (view == null ? 0 : 
view.getNumberOfQueuedBuffers());
     }
 
     @Override
     public int unsynchronizedGetNumberOfQueuedBuffers() {
         ResultSubpartitionView view = subpartitionView;
 
+        int count = toBeConsumedBuffers.size();
         if (view != null) {
-            return view.unsynchronizedGetNumberOfQueuedBuffers();
+            count += view.unsynchronizedGetNumberOfQueuedBuffers();
         }
 
-        return 0;
+        return count;
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 298987c83f1..2e0be6ae7b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
@@ -59,6 +61,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -664,6 +667,52 @@ class LocalInputChannelTest {
         assertThat(localChannel.getBuffersInUseCount()).isEqualTo(3);
     }
 
+    @Test
+    void testGetBuffersInUseCountIncludesToBeConsumedBuffers() throws 
Exception {
+        // given: Local input channel with recovered buffers in 
toBeConsumedBuffers
+        ResultSubpartitionView subpartitionView =
+                InputChannelTestUtils.createResultSubpartitionView(
+                        createFilledFinishedBufferConsumer(4096),
+                        createFilledFinishedBufferConsumer(4096));
+        TestingResultPartitionManager partitionManager =
+                new TestingResultPartitionManager(subpartitionView);
+        final SingleInputGate inputGate = createSingleInputGate(1);
+
+        // Create 3 recovered buffers
+        ArrayDeque<Buffer> recoveredBuffers = new ArrayDeque<>();
+        recoveredBuffers.add(TestBufferFactory.createBuffer(32));
+        recoveredBuffers.add(TestBufferFactory.createBuffer(32));
+        recoveredBuffers.add(TestBufferFactory.createBuffer(32));
+
+        final LocalInputChannel localChannel =
+                new LocalInputChannel(
+                        inputGate,
+                        0,
+                        new ResultPartitionID(),
+                        new ResultSubpartitionIndexSet(0),
+                        partitionManager,
+                        new TaskEventDispatcher(),
+                        0,
+                        0,
+                        new SimpleCounter(),
+                        new SimpleCounter(),
+                        ChannelStateWriter.NO_OP,
+                        recoveredBuffers);
+
+        inputGate.setInputChannels(localChannel);
+
+        // then: Before requesting subpartitions, buffers in use should 
include recovered buffers
+        assertThat(localChannel.getBuffersInUseCount()).isEqualTo(3);
+        
assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(3);
+
+        // when: The subpartition view is initialized (adds 2 more buffers 
from the view)
+        localChannel.requestSubpartitions();
+
+        // then: Buffers in use should include both recovered and subpartition 
view buffers
+        assertThat(localChannel.getBuffersInUseCount()).isEqualTo(5);
+        
assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(5);
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     /** Returns the configured number of buffers for each channel in a random 
order. */

Reply via email to