This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38f7c593538d78e67ed75f006826b9fc3e97826e
Author: kevin.cyj <kevin....@alibaba-inc.com>
AuthorDate: Wed Dec 22 17:02:27 2021 +0800

    [hotfix] Rename some methods of NetworkBufferPool and add more comments for 
better readability
    
    This closes #18173.
---
 .../flink/core/memory/MemorySegmentProvider.java   |  4 +-
 .../runtime/io/network/buffer/LocalBufferPool.java |  6 +-
 .../io/network/buffer/NetworkBufferPool.java       | 35 ++++++--
 .../network/partition/consumer/BufferManager.java  |  7 +-
 .../io/network/buffer/BufferPoolFactoryTest.java   | 12 +--
 .../io/network/buffer/LocalBufferPoolTest.java     |  4 +-
 .../io/network/buffer/NetworkBufferPoolTest.java   | 93 +++++++++++-----------
 .../network/partition/InputChannelTestUtils.java   | 10 ++-
 8 files changed, 100 insertions(+), 71 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
index c5fa945..265435d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
@@ -23,8 +23,8 @@ import java.util.Collection;
 
 /** The provider used for requesting and releasing batch of memory segments. */
 public interface MemorySegmentProvider {
-    Collection<MemorySegment> requestMemorySegments(int 
numberOfSegmentsToRequest)
+    Collection<MemorySegment> requestUnpooledMemorySegments(int 
numberOfSegmentsToRequest)
             throws IOException;
 
-    void recycleMemorySegments(Collection<MemorySegment> segments) throws 
IOException;
+    void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) 
throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 7d2003c..b08bcea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -244,7 +244,7 @@ class LocalBufferPool implements BufferPool {
 
             if (numberOfRequestedMemorySegments < numberOfSegmentsToReserve) {
                 availableMemorySegments.addAll(
-                        networkBufferPool.requestMemorySegmentsBlocking(
+                        networkBufferPool.requestPooledMemorySegmentsBlocking(
                                 numberOfSegmentsToReserve - 
numberOfRequestedMemorySegments));
                 toNotify = availabilityHelper.getUnavailableToResetAvailable();
             }
@@ -408,7 +408,7 @@ class LocalBufferPool implements BufferPool {
                 !isDestroyed,
                 "Destroyed buffer pools should never acquire segments - this 
will lead to buffer leaks.");
 
-        MemorySegment segment = networkBufferPool.requestMemorySegment();
+        MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
         if (segment != null) {
             availableMemorySegments.add(segment);
             numberOfRequestedMemorySegments++;
@@ -652,7 +652,7 @@ class LocalBufferPool implements BufferPool {
         assert Thread.holdsLock(availableMemorySegments);
 
         numberOfRequestedMemorySegments--;
-        networkBufferPool.recycle(segment);
+        networkBufferPool.recyclePooledMemorySegment(segment);
     }
 
     private void returnExcessMemorySegments() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 945e37f..509db03 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.AvailabilityProvider;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
@@ -149,27 +150,47 @@ public class NetworkBufferPool
                 segmentSize);
     }
 
+    /**
+     * Different from {@link #requestUnpooledMemorySegments} for unpooled 
segments allocation. This
+     * method and the below {@link #requestPooledMemorySegmentsBlocking} 
method are designed to be
+     * used from {@link LocalBufferPool} for pooled memory segments 
allocation. Note that these
+     * methods for pooled memory segments requesting and recycling are 
prohibited from acquiring the
+     * factoryLock to avoid deadlock.
+     */
     @Nullable
-    public MemorySegment requestMemorySegment() {
+    public MemorySegment requestPooledMemorySegment() {
         synchronized (availableMemorySegments) {
             return internalRequestMemorySegment();
         }
     }
 
-    public List<MemorySegment> requestMemorySegmentsBlocking(int 
numberOfSegmentsToRequest)
+    public List<MemorySegment> requestPooledMemorySegmentsBlocking(int 
numberOfSegmentsToRequest)
             throws IOException {
         return internalRequestMemorySegments(numberOfSegmentsToRequest);
     }
 
-    public void recycle(MemorySegment segment) {
+    /**
+     * Corresponding to {@link #requestPooledMemorySegmentsBlocking} and {@link
+     * #requestPooledMemorySegment}, this method is for pooled memory segments 
recycling.
+     */
+    public void recyclePooledMemorySegment(MemorySegment segment) {
         // Adds the segment back to the queue, which does not immediately free 
the memory
         // however, since this happens when references to the global pool are 
also released,
         // making the availableMemorySegments queue and its contained object 
reclaimable
         
internalRecycleMemorySegments(Collections.singleton(checkNotNull(segment)));
     }
 
+    /**
+     * Unpooled memory segments are requested directly from {@link 
NetworkBufferPool}, as opposed to
+     * pooled segments, that are requested through {@link BufferPool} that was 
created from this
+     * {@link NetworkBufferPool} (see {@link #createBufferPool}). They are 
used for example for
+     * exclusive {@link RemoteInputChannel} credits, that are permanently 
assigned to that channel,
+     * and never returned to any {@link BufferPool}. As opposed to pooled 
segments, when requested,
+     * unpooled segments needs to be accounted against {@link 
#numTotalRequiredBuffers}, which might
+     * require redistribution of the segments.
+     */
     @Override
-    public List<MemorySegment> requestMemorySegments(int 
numberOfSegmentsToRequest)
+    public List<MemorySegment> requestUnpooledMemorySegments(int 
numberOfSegmentsToRequest)
             throws IOException {
         checkArgument(
                 numberOfSegmentsToRequest >= 0,
@@ -245,8 +266,12 @@ public class NetworkBufferPool
         return segment;
     }
 
+    /**
+     * Corresponding to {@link #requestUnpooledMemorySegments}, this method is 
for unpooled memory
+     * segments recycling.
+     */
     @Override
-    public void recycleMemorySegments(Collection<MemorySegment> segments) {
+    public void recycleUnpooledMemorySegments(Collection<MemorySegment> 
segments) {
         recycleMemorySegments(segments, segments.size());
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
index 41eb12f..db38025 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
@@ -139,7 +139,8 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
             return;
         }
 
-        Collection<MemorySegment> segments = 
globalPool.requestMemorySegments(numExclusiveBuffers);
+        Collection<MemorySegment> segments =
+                globalPool.requestUnpooledMemorySegments(numExclusiveBuffers);
         synchronized (bufferQueue) {
             // AvailableBufferQueue::addExclusiveBuffer may release the 
previously allocated
             // floating buffer, which requires the caller to recycle these 
released floating
@@ -213,7 +214,7 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
                 // Similar to notifyBufferAvailable(), make sure that we never 
add a buffer
                 // after channel released all buffers via 
releaseAllResources().
                 if (inputChannel.isReleased()) {
-                    
globalPool.recycleMemorySegments(Collections.singletonList(segment));
+                    
globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment));
                     return;
                 } else {
                     releasedFloatingBuffer =
@@ -280,7 +281,7 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
         }
         try {
             if (exclusiveRecyclingSegments.size() > 0) {
-                globalPool.recycleMemorySegments(exclusiveRecyclingSegments);
+                
globalPool.recycleUnpooledMemorySegments(exclusiveRecyclingSegments);
             }
         } catch (Exception e) {
             err = firstOrSuppressed(e, err);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index a305cfa2..19bd2e7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -317,7 +317,7 @@ public class BufferPoolFactoryTest {
             BufferPool first = globalPool.createBufferPool(1, 10);
             assertEquals(10, first.getNumBuffers());
 
-            List<MemorySegment> segmentList1 = 
globalPool.requestMemorySegments(2);
+            List<MemorySegment> segmentList1 = 
globalPool.requestUnpooledMemorySegments(2);
             assertEquals(2, segmentList1.size());
             assertEquals(8, first.getNumBuffers());
 
@@ -325,12 +325,12 @@ public class BufferPoolFactoryTest {
             assertEquals(4, first.getNumBuffers());
             assertEquals(4, second.getNumBuffers());
 
-            List<MemorySegment> segmentList2 = 
globalPool.requestMemorySegments(2);
+            List<MemorySegment> segmentList2 = 
globalPool.requestUnpooledMemorySegments(2);
             assertEquals(2, segmentList2.size());
             assertEquals(3, first.getNumBuffers());
             assertEquals(3, second.getNumBuffers());
 
-            List<MemorySegment> segmentList3 = 
globalPool.requestMemorySegments(2);
+            List<MemorySegment> segmentList3 = 
globalPool.requestUnpooledMemorySegments(2);
             assertEquals(2, segmentList3.size());
             assertEquals(2, first.getNumBuffers());
             assertEquals(2, second.getNumBuffers());
@@ -339,17 +339,17 @@ public class BufferPoolFactoryTest {
                     "Wrong number of available segments after creating buffer 
pools and requesting segments.";
             assertEquals(msg, 2, 
globalPool.getNumberOfAvailableMemorySegments());
 
-            globalPool.recycleMemorySegments(segmentList1);
+            globalPool.recycleUnpooledMemorySegments(segmentList1);
             assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
             assertEquals(3, first.getNumBuffers());
             assertEquals(3, second.getNumBuffers());
 
-            globalPool.recycleMemorySegments(segmentList2);
+            globalPool.recycleUnpooledMemorySegments(segmentList2);
             assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
             assertEquals(4, first.getNumBuffers());
             assertEquals(4, second.getNumBuffers());
 
-            globalPool.recycleMemorySegments(segmentList3);
+            globalPool.recycleUnpooledMemorySegments(segmentList3);
             assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
             assertEquals(5, first.getNumBuffers());
             assertEquals(5, second.getNumBuffers());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index ff72fbc..f77275a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -547,11 +547,11 @@ public class LocalBufferPoolTest extends TestLogger {
 
         @Nullable
         @Override
-        public MemorySegment requestMemorySegment() {
+        public MemorySegment requestPooledMemorySegment() {
             if (requestCounter++ == 1) {
                 return null;
             }
-            return super.requestMemorySegment();
+            return super.requestPooledMemorySegment();
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index c0e9b90..809b5d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -129,7 +129,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 
         NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 
bufferSize);
 
-        MemorySegment segment = globalPool.requestMemorySegment();
+        MemorySegment segment = globalPool.requestPooledMemorySegment();
         assertThat(segment, is(notNullValue()));
 
         assertThat(globalPool.getTotalNumberOfMemorySegments(), 
is(numBuffers));
@@ -245,8 +245,8 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
-     * currently containing the number of required free segments.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with 
the {@link
+     * NetworkBufferPool} currently containing the number of required free 
segments.
      */
     @Test
     public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
IOException {
@@ -256,21 +256,21 @@ public class NetworkBufferPoolTest extends TestLogger {
 
         List<MemorySegment> memorySegments = Collections.emptyList();
         try {
-            memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
+            memorySegments = 
globalPool.requestUnpooledMemorySegments(numBuffers / 2);
             assertEquals(memorySegments.size(), numBuffers / 2);
 
-            globalPool.recycleMemorySegments(memorySegments);
+            globalPool.recycleUnpooledMemorySegments(memorySegments);
             memorySegments.clear();
             assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
         } finally {
-            globalPool.recycleMemorySegments(memorySegments); // just in case
+            globalPool.recycleUnpooledMemorySegments(memorySegments); // just 
in case
             globalPool.destroy();
         }
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
-     * buffers exceeding the capacity of {@link NetworkBufferPool}.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with 
the number of
+     * required buffers exceeding the capacity of {@link NetworkBufferPool}.
      */
     @Test
     public void testRequestMemorySegmentsMoreThanTotalBuffers() {
@@ -279,7 +279,7 @@ public class NetworkBufferPoolTest extends TestLogger {
         NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
 
         try {
-            globalPool.requestMemorySegments(numBuffers + 1);
+            globalPool.requestUnpooledMemorySegments(numBuffers + 1);
             fail("Should throw an IOException");
         } catch (IOException e) {
             assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
@@ -289,22 +289,22 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to cause
-     * exception.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with 
the invalid argument
+     * to cause exception.
      */
     @Test(expected = IllegalArgumentException.class)
     public void testRequestMemorySegmentsWithInvalidArgument() throws 
IOException {
         NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
         // the number of requested buffers should be non-negative
-        globalPool.requestMemorySegments(-1);
+        globalPool.requestUnpooledMemorySegments(-1);
         globalPool.destroy();
         fail("Should throw an IllegalArgumentException");
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
-     * currently not containing the number of required free segments 
(currently occupied by a buffer
-     * pool).
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with 
the {@link
+     * NetworkBufferPool} currently not containing the number of required free 
segments (currently
+     * occupied by a buffer pool).
      */
     @Test
     public void testRequestMemorySegmentsWithBuffersTaken()
@@ -347,7 +347,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             // take more buffers than are freely available at the moment via 
requestMemorySegments()
             isRunning.await();
-            memorySegments = 
networkBufferPool.requestMemorySegments(numBuffers / 2);
+            memorySegments = 
networkBufferPool.requestUnpooledMemorySegments(numBuffers / 2);
             assertThat(memorySegments, not(hasItem(nullValue())));
         } finally {
             if (bufferRecycler != null) {
@@ -356,21 +356,21 @@ public class NetworkBufferPoolTest extends TestLogger {
             if (lbp1 != null) {
                 lbp1.lazyDestroy();
             }
-            networkBufferPool.recycleMemorySegments(memorySegments);
+            networkBufferPool.recycleUnpooledMemorySegments(memorySegments);
             networkBufferPool.destroy();
         }
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying 
it may be aborted in
-     * case of a concurrent {@link NetworkBufferPool#destroy()} call.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)}, 
verifying it may be
+     * aborted in case of a concurrent {@link NetworkBufferPool#destroy()} 
call.
      */
     @Test
     public void testRequestMemorySegmentsInterruptable() throws Exception {
         final int numBuffers = 10;
 
         NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
-        MemorySegment segment = globalPool.requestMemorySegment();
+        MemorySegment segment = globalPool.requestPooledMemorySegment();
         assertNotNull(segment);
 
         final OneShotLatch isRunning = new OneShotLatch();
@@ -379,7 +379,7 @@ public class NetworkBufferPoolTest extends TestLogger {
                     @Override
                     public void go() throws IOException {
                         isRunning.trigger();
-                        globalPool.requestMemorySegments(10);
+                        globalPool.requestUnpooledMemorySegments(10);
                     }
                 };
         asyncRequest.start();
@@ -403,15 +403,15 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying 
it may be aborted and
-     * remains in a defined state even if the waiting is interrupted.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)}, 
verifying it may be
+     * aborted and remains in a defined state even if the waiting is 
interrupted.
      */
     @Test
     public void testRequestMemorySegmentsInterruptable2() throws Exception {
         final int numBuffers = 10;
 
         NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
-        MemorySegment segment = globalPool.requestMemorySegment();
+        MemorySegment segment = globalPool.requestPooledMemorySegment();
         assertNotNull(segment);
 
         final OneShotLatch isRunning = new OneShotLatch();
@@ -420,7 +420,7 @@ public class NetworkBufferPoolTest extends TestLogger {
                     @Override
                     public void go() throws IOException {
                         isRunning.trigger();
-                        globalPool.requestMemorySegments(10);
+                        globalPool.requestUnpooledMemorySegments(10);
                     }
                 };
         asyncRequest.start();
@@ -432,7 +432,7 @@ public class NetworkBufferPoolTest extends TestLogger {
         Thread.sleep(10);
         asyncRequest.interrupt();
 
-        globalPool.recycle(segment);
+        globalPool.recyclePooledMemorySegment(segment);
 
         try {
             asyncRequest.sync();
@@ -448,7 +448,7 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} and verifies 
it will end
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} and 
verifies it will end
      * exceptionally when failing to acquire all the segments in the specific 
timeout.
      */
     @Test
@@ -471,7 +471,7 @@ public class NetworkBufferPoolTest extends TestLogger {
                 new CheckedThread() {
                     @Override
                     public void go() throws Exception {
-                        
globalPool.requestMemorySegments(numberOfSegmentsToRequest);
+                        
globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
                     }
                 };
 
@@ -490,8 +490,8 @@ public class NetworkBufferPoolTest extends TestLogger {
     /**
      * Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
      * correctly maintained after memory segments are requested by {@link
-     * NetworkBufferPool#requestMemorySegment()} and recycled by {@link
-     * NetworkBufferPool#recycle(MemorySegment)}.
+     * NetworkBufferPool#requestPooledMemorySegment()} and recycled by {@link
+     * NetworkBufferPool#recyclePooledMemorySegment(MemorySegment)}.
      */
     @Test
     public void testIsAvailableOrNotAfterRequestAndRecycleSingleSegment() {
@@ -504,22 +504,22 @@ public class NetworkBufferPoolTest extends TestLogger {
             assertTrue(globalPool.getAvailableFuture().isDone());
 
             // request the first segment
-            final MemorySegment segment1 = 
checkNotNull(globalPool.requestMemorySegment());
+            final MemorySegment segment1 = 
checkNotNull(globalPool.requestPooledMemorySegment());
             assertTrue(globalPool.getAvailableFuture().isDone());
 
             // request the second segment
-            final MemorySegment segment2 = 
checkNotNull(globalPool.requestMemorySegment());
+            final MemorySegment segment2 = 
checkNotNull(globalPool.requestPooledMemorySegment());
             assertFalse(globalPool.getAvailableFuture().isDone());
 
             final CompletableFuture<?> availableFuture = 
globalPool.getAvailableFuture();
 
             // recycle the first segment
-            globalPool.recycle(segment1);
+            globalPool.recyclePooledMemorySegment(segment1);
             assertTrue(availableFuture.isDone());
             assertTrue(globalPool.getAvailableFuture().isDone());
 
             // recycle the second segment
-            globalPool.recycle(segment2);
+            globalPool.recyclePooledMemorySegment(segment2);
             assertTrue(globalPool.getAvailableFuture().isDone());
 
         } finally {
@@ -530,8 +530,8 @@ public class NetworkBufferPoolTest extends TestLogger {
     /**
      * Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
      * correctly maintained after memory segments are requested by {@link
-     * NetworkBufferPool#requestMemorySegments(int)} and recycled by {@link
-     * NetworkBufferPool#recycleMemorySegments(Collection)}.
+     * NetworkBufferPool#requestUnpooledMemorySegments(int)} and recycled by 
{@link
+     * NetworkBufferPool#recycleUnpooledMemorySegments(Collection)}.
      */
     @Test(timeout = 10000L)
     public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments()
@@ -547,13 +547,13 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             // request 5 segments
             List<MemorySegment> segments1 =
-                    
globalPool.requestMemorySegments(numberOfSegmentsToRequest);
+                    
globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
             assertTrue(globalPool.getAvailableFuture().isDone());
             assertEquals(numberOfSegmentsToRequest, segments1.size());
 
             // request another 5 segments
             List<MemorySegment> segments2 =
-                    
globalPool.requestMemorySegments(numberOfSegmentsToRequest);
+                    
globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
             assertFalse(globalPool.getAvailableFuture().isDone());
             assertEquals(numberOfSegmentsToRequest, segments2.size());
 
@@ -566,7 +566,8 @@ public class NetworkBufferPoolTest extends TestLogger {
                         public void go() throws Exception {
                             // this request should be blocked until at least 5 
segments are recycled
                             segments3.addAll(
-                                    
globalPool.requestMemorySegments(numberOfSegmentsToRequest));
+                                    globalPool.requestUnpooledMemorySegments(
+                                            numberOfSegmentsToRequest));
                             latch.countDown();
                         }
                     };
@@ -574,7 +575,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             // recycle 5 segments
             CompletableFuture<?> availableFuture = 
globalPool.getAvailableFuture();
-            globalPool.recycleMemorySegments(segments1);
+            globalPool.recycleUnpooledMemorySegments(segments1);
             assertTrue(availableFuture.isDone());
 
             // wait util the third request is fulfilled
@@ -583,11 +584,11 @@ public class NetworkBufferPoolTest extends TestLogger {
             assertEquals(numberOfSegmentsToRequest, segments3.size());
 
             // recycle another 5 segments
-            globalPool.recycleMemorySegments(segments2);
+            globalPool.recycleUnpooledMemorySegments(segments2);
             assertTrue(globalPool.getAvailableFuture().isDone());
 
             // recycle the last 5 segments
-            globalPool.recycleMemorySegments(segments3);
+            globalPool.recycleUnpooledMemorySegments(segments3);
             assertTrue(globalPool.getAvailableFuture().isDone());
 
         } finally {
@@ -624,10 +625,10 @@ public class NetworkBufferPoolTest extends TestLogger {
             // request some segments from the global pool in two different ways
             final List<MemorySegment> segments = new 
ArrayList<>(numberOfSegmentsToRequest - 1);
             for (int i = 0; i < numberOfSegmentsToRequest - 1; ++i) {
-                segments.add(globalPool.requestMemorySegment());
+                segments.add(globalPool.requestPooledMemorySegment());
             }
             final List<MemorySegment> exclusiveSegments =
-                    globalPool.requestMemorySegments(
+                    globalPool.requestUnpooledMemorySegments(
                             globalPool.getNumberOfAvailableMemorySegments() - 
1);
             assertTrue(globalPool.getAvailableFuture().isDone());
             for (final BufferPool localPool : localBufferPools) {
@@ -674,9 +675,9 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             // recycle the previously requested segments
             for (MemorySegment segment : segments) {
-                globalPool.recycle(segment);
+                globalPool.recyclePooledMemorySegment(segment);
             }
-            globalPool.recycleMemorySegments(exclusiveSegments);
+            globalPool.recycleUnpooledMemorySegments(exclusiveSegments);
 
             assertTrue(globalPoolAvailableFuture.isDone());
             for (CompletableFuture<?> localPoolAvailableFuture : 
localPoolAvailableFutures) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 8abcdd1..9d336cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -254,12 +254,13 @@ public class InputChannelTestUtils {
         private StubMemorySegmentProvider() {}
 
         @Override
-        public Collection<MemorySegment> requestMemorySegments(int 
numberOfSegmentsToRequest) {
+        public Collection<MemorySegment> requestUnpooledMemorySegments(
+                int numberOfSegmentsToRequest) {
             return Collections.emptyList();
         }
 
         @Override
-        public void recycleMemorySegments(Collection<MemorySegment> segments) 
{}
+        public void recycleUnpooledMemorySegments(Collection<MemorySegment> 
segments) {}
     }
 
     /** {@link MemorySegmentProvider} that provides unpooled {@link 
MemorySegment}s. */
@@ -271,12 +272,13 @@ public class InputChannelTestUtils {
         }
 
         @Override
-        public Collection<MemorySegment> requestMemorySegments(int 
numberOfSegmentsToRequest) {
+        public Collection<MemorySegment> requestUnpooledMemorySegments(
+                int numberOfSegmentsToRequest) {
             return Collections.singletonList(
                     MemorySegmentFactory.allocateUnpooledSegment(pageSize));
         }
 
         @Override
-        public void recycleMemorySegments(Collection<MemorySegment> segments) 
{}
+        public void recycleUnpooledMemorySegments(Collection<MemorySegment> 
segments) {}
     }
 }

Reply via email to