This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ffc6f3bfabd [FLINK-31763][runtime] Convert requested buffers to 
overdraft buffers when pool size is decreased
ffc6f3bfabd is described below

commit ffc6f3bfabd22b49b08f027400c194a8e7c9c51a
Author: Weijie Guo <res...@163.com>
AuthorDate: Tue Apr 11 23:33:57 2023 +0800

    [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when 
pool size is decreased
---
 .../runtime/io/network/buffer/LocalBufferPool.java | 14 +++-
 .../io/network/buffer/LocalBufferPoolTest.java     | 85 ++++++++++++----------
 2 files changed, 59 insertions(+), 40 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 87b8a11e2aa..6506ab9f942 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -49,7 +49,11 @@ import static 
org.apache.flink.util.concurrent.FutureUtils.assertNoException;
  *
  * <p>The size of this pool can be dynamically changed at runtime ({@link 
#setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link 
NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code 
numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize + 
maxOverdraftBuffersPerGate}. In
+ * order to meet this requirement, when the size of this pool changes,
+ * numberOfRequestedMemorySegments and 
numberOfRequestedOverdraftMemorySegments can be converted to
+ * each other.
  *
  * <p>Availability is defined as returning a non-overdraft segment on a 
subsequent {@link
  * #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a 
non-blocking {@link
@@ -671,13 +675,19 @@ class LocalBufferPool implements BufferPool {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
-            // reset overdraft buffers
+            // If pool size increases, try to convert overdraft buffer to 
ordinary buffer.
             while (numberOfRequestedOverdraftMemorySegments > 0
                     && numberOfRequestedMemorySegments < currentPoolSize) {
                 numberOfRequestedOverdraftMemorySegments--;
                 numberOfRequestedMemorySegments++;
             }
 
+            // If pool size decreases, try to convert ordinary buffer to 
overdraft buffer.
+            while (numberOfRequestedMemorySegments > currentPoolSize) {
+                numberOfRequestedMemorySegments--;
+                numberOfRequestedOverdraftMemorySegments++;
+            }
+
             returnExcessMemorySegments();
 
             if (isDestroyed) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 956d55e5948..6c0fcf13b47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -38,10 +38,8 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -255,9 +253,38 @@ class LocalBufferPoolTest {
     void testDecreasePoolSize() throws Exception {
         final int maxMemorySegments = 10;
         final int requiredMemorySegments = 4;
-        final int maxOverdraftBuffers = 2;
-        final int largePoolSize = 5;
-        final int smallPoolSize = 4;
+
+        // requested buffers is equal to small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 
0);
+        // requested buffers is less than small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 
1);
+        // exceed buffers is equal to maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 
0);
+        // exceed buffers is greater than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 
0);
+        // exceed buffers is less than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 
0);
+        // decrease pool size with overdraft buffer.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 
0);
+    }
+
+    void testDecreasePoolSizeInternal(
+            int maxMemorySegments,
+            int requiredMemorySegments,
+            int largePoolSize,
+            int smallPoolSize,
+            int maxOverdraftBuffers,
+            int numBuffersToRequest,
+            int numRequestedOverdraftBuffersAfterDecreasing,
+            int numRequestedOrdinaryBuffersAfterDecreasing,
+            int numAvailableBuffersAfterDecreasing)
+            throws Exception {
         LocalBufferPool bufferPool =
                 new LocalBufferPool(
                         networkBufferPool,
@@ -266,51 +293,33 @@ class LocalBufferPoolTest {
                         0,
                         Integer.MAX_VALUE,
                         maxOverdraftBuffers);
-        Queue<MemorySegment> buffers = new LinkedList<>();
+        List<MemorySegment> buffers = new ArrayList<>();
 
         // set a larger pool size.
         bufferPool.setNumBuffers(largePoolSize);
         assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
 
-        // request all buffer.
-        for (int i = 0; i < largePoolSize; i++) {
+        // request buffers.
+        for (int i = 0; i < numBuffersToRequest; i++) {
             buffers.add(bufferPool.requestMemorySegmentBlocking());
         }
-        assertThat(bufferPool.isAvailable()).isFalse();
-
-        // request 1 overdraft buffers.
-        buffers.add(bufferPool.requestMemorySegmentBlocking());
-        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
-        assertThat(bufferPool.isAvailable()).isFalse();
 
         // set a small pool size.
         bufferPool.setNumBuffers(smallPoolSize);
         assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
-        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
-        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
-        assertThat(bufferPool.isAvailable()).isFalse();
-        buffers.add(bufferPool.requestMemorySegmentBlocking());
-        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
-        assertThat(bufferPool.isAvailable()).isFalse();
-
-        // return all overdraft buffers.
-        bufferPool.recycle(buffers.poll());
-        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
-        assertThat(bufferPool.isAvailable()).isFalse();
-        bufferPool.recycle(buffers.poll());
-        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
-        assertThat(bufferPool.isAvailable()).isFalse();
-
-        // return the excess buffer.
-        bufferPool.recycle(buffers.poll());
-        assertThat(bufferPool.isAvailable()).isFalse();
-        // return non-excess buffers.
-        bufferPool.recycle(buffers.poll());
-        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
-        assertThat(bufferPool.isAvailable()).isTrue();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+                .isEqualTo(numRequestedOverdraftBuffersAfterDecreasing);
+        assertThat(
+                        bufferPool.bestEffortGetNumOfUsedBuffers()
+                                + 
bufferPool.getNumberOfAvailableMemorySegments()
+                                - 
bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+                .isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments())
+                .isEqualTo(numAvailableBuffersAfterDecreasing);
+        
assertThat(bufferPool.isAvailable()).isEqualTo(numAvailableBuffersAfterDecreasing
 > 0);
 
-        while (!buffers.isEmpty()) {
-            bufferPool.recycle(buffers.poll());
+        for (MemorySegment buffer : buffers) {
+            bufferPool.recycle(buffer);
         }
         bufferPool.lazyDestroy();
     }

Reply via email to