This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3379c2c085da43bb452536c981a7fc13f39482ee
Author: Weijie Guo <res...@163.com>
AuthorDate: Fri Apr 21 17:35:23 2023 +0800

    [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments 
in LocalBufferPool.
---
 .../runtime/io/network/buffer/LocalBufferPool.java | 72 ++++++----------------
 .../io/network/buffer/LocalBufferPoolTest.java     | 21 +++++--
 2 files changed, 35 insertions(+), 58 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 190734c35b4..6d6d236f902 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -49,11 +49,11 @@ import static 
org.apache.flink.util.concurrent.FutureUtils.assertNoException;
  *
  * <p>The size of this pool can be dynamically changed at runtime ({@link 
#setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link 
NetworkBufferPool} to match
- * its new size. New buffers can be requested only when {@code 
numberOfRequestedMemorySegments +
- * numberOfRequestedOverdraftMemorySegments < currentPoolSize + 
maxOverdraftBuffersPerGate}. In
- * order to meet this requirement, when the size of this pool changes,
- * numberOfRequestedMemorySegments and 
numberOfRequestedOverdraftMemorySegments can be converted to
- * each other.
+ * its new size.
+ *
+ * <p>New buffers can be requested only when {@code 
numberOfRequestedMemorySegments <
+ * currentPoolSize + maxOverdraftBuffersPerGate}. In other words, all buffers 
exceeding the
+ * currentPoolSize will be dynamically regarded as overdraft buffers.
  *
  * <p>Availability is defined as returning a non-overdraft segment on a 
subsequent {@link
  * #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a 
non-blocking {@link
@@ -124,9 +124,6 @@ class LocalBufferPool implements BufferPool {
 
     private int maxOverdraftBuffersPerGate;
 
-    @GuardedBy("availableMemorySegments")
-    private int numberOfRequestedOverdraftMemorySegments;
-
     @GuardedBy("availableMemorySegments")
     private boolean isDestroyed;
 
@@ -306,13 +303,6 @@ class LocalBufferPool implements BufferPool {
         }
     }
 
-    @VisibleForTesting
-    public int getNumberOfRequestedOverdraftMemorySegments() {
-        synchronized (availableMemorySegments) {
-            return numberOfRequestedOverdraftMemorySegments;
-        }
-    }
-
     @Override
     public int getNumberOfAvailableMemorySegments() {
         synchronized (availableMemorySegments) {
@@ -331,11 +321,7 @@ class LocalBufferPool implements BufferPool {
     @SuppressWarnings("FieldAccessNotGuarded")
     @Override
     public int bestEffortGetNumOfUsedBuffers() {
-        return Math.max(
-                0,
-                numberOfRequestedMemorySegments
-                        + numberOfRequestedOverdraftMemorySegments
-                        - availableMemorySegments.size());
+        return Math.max(0, numberOfRequestedMemorySegments - 
availableMemorySegments.size());
     }
 
     @Override
@@ -452,14 +438,9 @@ class LocalBufferPool implements BufferPool {
             return false;
         }
 
-        checkState(
-                !isDestroyed,
-                "Destroyed buffer pools should never acquire segments - this 
will lead to buffer leaks.");
-
-        MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
+        MemorySegment segment = requestPooledMemorySegment();
         if (segment != null) {
             availableMemorySegments.add(segment);
-            numberOfRequestedMemorySegments++;
             return true;
         }
         return false;
@@ -469,17 +450,25 @@ class LocalBufferPool implements BufferPool {
     private MemorySegment requestOverdraftMemorySegmentFromGlobal() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        if (numberOfRequestedOverdraftMemorySegments >= 
maxOverdraftBuffersPerGate) {
+        // if overdraft buffers(i.e. buffers exceeding poolSize) is greater 
than or equal to
+        // maxOverdraftBuffersPerGate, no new buffer can be requested.
+        if (numberOfRequestedMemorySegments - currentPoolSize >= 
maxOverdraftBuffersPerGate) {
             return null;
         }
 
+        return requestPooledMemorySegment();
+    }
+
+    @Nullable
+    @GuardedBy("availableMemorySegments")
+    private MemorySegment requestPooledMemorySegment() {
         checkState(
                 !isDestroyed,
                 "Destroyed buffer pools should never acquire segments - this 
will lead to buffer leaks.");
 
         MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
         if (segment != null) {
-            numberOfRequestedOverdraftMemorySegments++;
+            numberOfRequestedMemorySegments++;
         }
         return segment;
     }
@@ -525,9 +514,7 @@ class LocalBufferPool implements BufferPool {
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;
     }
 
     @GuardedBy("availableMemorySegments")
@@ -684,19 +671,6 @@ class LocalBufferPool implements BufferPool {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
-            // If pool size increases, try to convert overdraft buffer to 
ordinary buffer.
-            while (numberOfRequestedOverdraftMemorySegments > 0
-                    && numberOfRequestedMemorySegments < currentPoolSize) {
-                numberOfRequestedOverdraftMemorySegments--;
-                numberOfRequestedMemorySegments++;
-            }
-
-            // If pool size decreases, try to convert ordinary buffer to 
overdraft buffer.
-            while (numberOfRequestedMemorySegments > currentPoolSize) {
-                numberOfRequestedMemorySegments--;
-                numberOfRequestedOverdraftMemorySegments++;
-            }
-
             returnExcessMemorySegments();
 
             if (isDestroyed) {
@@ -760,12 +734,7 @@ class LocalBufferPool implements BufferPool {
     private void returnMemorySegment(MemorySegment segment) {
         assert Thread.holdsLock(availableMemorySegments);
 
-        // When using the overdraft buffer, return the overdraft buffer first.
-        if (numberOfRequestedOverdraftMemorySegments > 0) {
-            numberOfRequestedOverdraftMemorySegments--;
-        } else {
-            numberOfRequestedMemorySegments--;
-        }
+        numberOfRequestedMemorySegments--;
         networkBufferPool.recyclePooledMemorySegment(segment);
     }
 
@@ -785,8 +754,7 @@ class LocalBufferPool implements BufferPool {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedMemorySegments > currentPoolSize;
     }
 
     @GuardedBy("availableMemorySegments")
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 4646fee2f49..6126d07e4ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -307,9 +307,9 @@ class LocalBufferPoolTest {
         // set a small pool size.
         bufferPool.setNumBuffers(smallPoolSize);
         assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
-        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+        assertThat(getNumberRequestedOverdraftBuffers(bufferPool))
                 .isEqualTo(numRequestedOverdraftBuffersAfterDecreasing);
-        assertThat(bufferPool.getNumberOfRequestedMemorySegments())
+        assertThat(getNumberRequestedOrdinaryBuffers(bufferPool))
                 .isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing);
         assertThat(bufferPool.getNumberOfAvailableMemorySegments())
                 .isEqualTo(numAvailableBuffersAfterDecreasing);
@@ -396,8 +396,7 @@ class LocalBufferPoolTest {
             buffers.add(bufferPool.requestMemorySegmentBlocking());
         }
         assertThat(bufferPool.requestMemorySegment()).isNull();
-        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
-                .isEqualTo(maxOverdraftBuffers);
+        
assertThat(getNumberRequestedOverdraftBuffers(bufferPool)).isEqualTo(maxOverdraftBuffers);
         assertThat(bufferPool.isAvailable()).isFalse();
 
         // set a large pool size.
@@ -405,7 +404,7 @@ class LocalBufferPoolTest {
         assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
         assertThat(bufferPool.getNumberOfAvailableMemorySegments())
                 .isEqualTo(numAvailableBuffersAfterIncreasePoolSize);
-        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+        assertThat(getNumberRequestedOverdraftBuffers(bufferPool))
                 .isEqualTo(numOverdraftBuffersAfterIncreasePoolSize);
         
assertThat(bufferPool.isAvailable()).isEqualTo(isAvailableAfterIncreasePoolSize);
 
@@ -864,7 +863,7 @@ class LocalBufferPoolTest {
         if (numberOfRequestedOverdraftBuffer > 0) {
             checkArgument(!isAvailable);
         }
-        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+        assertThat(getNumberRequestedOverdraftBuffers(bufferPool))
                 .isEqualTo(numberOfRequestedOverdraftBuffer);
 
         
assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numberOfRequestedBuffer);
@@ -875,6 +874,16 @@ class LocalBufferPoolTest {
     // Helpers
     // ------------------------------------------------------------------------
 
+    private static int getNumberRequestedOverdraftBuffers(LocalBufferPool 
bufferPool) {
+        return Math.max(
+                bufferPool.getNumberOfRequestedMemorySegments() - 
bufferPool.getNumBuffers(), 0);
+    }
+
+    private static int getNumberRequestedOrdinaryBuffers(LocalBufferPool 
bufferPool) {
+        return Math.min(
+                bufferPool.getNumBuffers(), 
bufferPool.getNumberOfRequestedMemorySegments());
+    }
+
     private int getNumRequestedFromMemorySegmentPool() {
         return networkBufferPool.getTotalNumberOfMemorySegments()
                 - networkBufferPool.getNumberOfAvailableMemorySegments();

Reply via email to