This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2edc003004d0d0f1e40ef2b7e14189965183f46c Author: Weijie Guo <res...@163.com> AuthorDate: Fri Apr 21 17:35:23 2023 +0800 [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool. --- .../runtime/io/network/buffer/LocalBufferPool.java | 72 ++++++---------------- .../io/network/buffer/LocalBufferPoolTest.java | 21 +++++-- 2 files changed, 35 insertions(+), 58 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 190734c35b4..6d6d236f902 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -49,11 +49,11 @@ import static org.apache.flink.util.concurrent.FutureUtils.assertNoException; * * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match - * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments + - * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In - * order to meet this requirement, when the size of this pool changes, - * numberOfRequestedMemorySegments and numberOfRequestedOverdraftMemorySegments can be converted to - * each other. + * its new size. + * + * <p>New buffers can be requested only when {@code numberOfRequestedMemorySegments < + * currentPoolSize + maxOverdraftBuffersPerGate}. In other words, all buffers exceeding the + * currentPoolSize will be dynamically regarded as overdraft buffers. * * <p>Availability is defined as returning a non-overdraft segment on a subsequent {@link * #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a non-blocking {@link @@ -124,9 +124,6 @@ class LocalBufferPool implements BufferPool { private int maxOverdraftBuffersPerGate; - @GuardedBy("availableMemorySegments") - private int numberOfRequestedOverdraftMemorySegments; - @GuardedBy("availableMemorySegments") private boolean isDestroyed; @@ -306,13 +303,6 @@ class LocalBufferPool implements BufferPool { } } - @VisibleForTesting - public int getNumberOfRequestedOverdraftMemorySegments() { - synchronized (availableMemorySegments) { - return numberOfRequestedOverdraftMemorySegments; - } - } - @Override public int getNumberOfAvailableMemorySegments() { synchronized (availableMemorySegments) { @@ -331,11 +321,7 @@ class LocalBufferPool implements BufferPool { @SuppressWarnings("FieldAccessNotGuarded") @Override public int bestEffortGetNumOfUsedBuffers() { - return Math.max( - 0, - numberOfRequestedMemorySegments - + numberOfRequestedOverdraftMemorySegments - - availableMemorySegments.size()); + return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size()); } @Override @@ -452,14 +438,9 @@ class LocalBufferPool implements BufferPool { return false; } - checkState( - !isDestroyed, - "Destroyed buffer pools should never acquire segments - this will lead to buffer leaks."); - - MemorySegment segment = networkBufferPool.requestPooledMemorySegment(); + MemorySegment segment = requestPooledMemorySegment(); if (segment != null) { availableMemorySegments.add(segment); - numberOfRequestedMemorySegments++; return true; } return false; @@ -469,17 +450,25 @@ class LocalBufferPool implements BufferPool { private MemorySegment requestOverdraftMemorySegmentFromGlobal() { assert Thread.holdsLock(availableMemorySegments); - if (numberOfRequestedOverdraftMemorySegments >= maxOverdraftBuffersPerGate) { + // if overdraft buffers(i.e. buffers exceeding poolSize) is greater than or equal to + // maxOverdraftBuffersPerGate, no new buffer can be requested. + if (numberOfRequestedMemorySegments - currentPoolSize >= maxOverdraftBuffersPerGate) { return null; } + return requestPooledMemorySegment(); + } + + @Nullable + @GuardedBy("availableMemorySegments") + private MemorySegment requestPooledMemorySegment() { checkState( !isDestroyed, "Destroyed buffer pools should never acquire segments - this will lead to buffer leaks."); MemorySegment segment = networkBufferPool.requestPooledMemorySegment(); if (segment != null) { - numberOfRequestedOverdraftMemorySegments++; + numberOfRequestedMemorySegments++; } return segment; } @@ -525,9 +514,7 @@ class LocalBufferPool implements BufferPool { private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); - return !availableMemorySegments.isEmpty() - && unavailableSubpartitionsCount == 0 - && numberOfRequestedOverdraftMemorySegments == 0; + return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; } @GuardedBy("availableMemorySegments") @@ -684,19 +671,6 @@ class LocalBufferPool implements BufferPool { currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments); - // If pool size increases, try to convert overdraft buffer to ordinary buffer. - while (numberOfRequestedOverdraftMemorySegments > 0 - && numberOfRequestedMemorySegments < currentPoolSize) { - numberOfRequestedOverdraftMemorySegments--; - numberOfRequestedMemorySegments++; - } - - // If pool size decreases, try to convert ordinary buffer to overdraft buffer. - while (numberOfRequestedMemorySegments > currentPoolSize) { - numberOfRequestedMemorySegments--; - numberOfRequestedOverdraftMemorySegments++; - } - returnExcessMemorySegments(); if (isDestroyed) { @@ -760,12 +734,7 @@ class LocalBufferPool implements BufferPool { private void returnMemorySegment(MemorySegment segment) { assert Thread.holdsLock(availableMemorySegments); - // When using the overdraft buffer, return the overdraft buffer first. - if (numberOfRequestedOverdraftMemorySegments > 0) { - numberOfRequestedOverdraftMemorySegments--; - } else { - numberOfRequestedMemorySegments--; - } + numberOfRequestedMemorySegments--; networkBufferPool.recyclePooledMemorySegment(segment); } @@ -785,8 +754,7 @@ class LocalBufferPool implements BufferPool { @GuardedBy("availableMemorySegments") private boolean hasExcessBuffers() { - return numberOfRequestedOverdraftMemorySegments > 0 - || numberOfRequestedMemorySegments > currentPoolSize; + return numberOfRequestedMemorySegments > currentPoolSize; } @GuardedBy("availableMemorySegments") diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java index 4646fee2f49..6126d07e4ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java @@ -307,9 +307,9 @@ class LocalBufferPoolTest { // set a small pool size. bufferPool.setNumBuffers(smallPoolSize); assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()) + assertThat(getNumberRequestedOverdraftBuffers(bufferPool)) .isEqualTo(numRequestedOverdraftBuffersAfterDecreasing); - assertThat(bufferPool.getNumberOfRequestedMemorySegments()) + assertThat(getNumberRequestedOrdinaryBuffers(bufferPool)) .isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing); assertThat(bufferPool.getNumberOfAvailableMemorySegments()) .isEqualTo(numAvailableBuffersAfterDecreasing); @@ -396,8 +396,7 @@ class LocalBufferPoolTest { buffers.add(bufferPool.requestMemorySegmentBlocking()); } assertThat(bufferPool.requestMemorySegment()).isNull(); - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()) - .isEqualTo(maxOverdraftBuffers); + assertThat(getNumberRequestedOverdraftBuffers(bufferPool)).isEqualTo(maxOverdraftBuffers); assertThat(bufferPool.isAvailable()).isFalse(); // set a large pool size. @@ -405,7 +404,7 @@ class LocalBufferPoolTest { assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); assertThat(bufferPool.getNumberOfAvailableMemorySegments()) .isEqualTo(numAvailableBuffersAfterIncreasePoolSize); - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()) + assertThat(getNumberRequestedOverdraftBuffers(bufferPool)) .isEqualTo(numOverdraftBuffersAfterIncreasePoolSize); assertThat(bufferPool.isAvailable()).isEqualTo(isAvailableAfterIncreasePoolSize); @@ -864,7 +863,7 @@ class LocalBufferPoolTest { if (numberOfRequestedOverdraftBuffer > 0) { checkArgument(!isAvailable); } - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()) + assertThat(getNumberRequestedOverdraftBuffers(bufferPool)) .isEqualTo(numberOfRequestedOverdraftBuffer); assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numberOfRequestedBuffer); @@ -875,6 +874,16 @@ class LocalBufferPoolTest { // Helpers // ------------------------------------------------------------------------ + private static int getNumberRequestedOverdraftBuffers(LocalBufferPool bufferPool) { + return Math.max( + bufferPool.getNumberOfRequestedMemorySegments() - bufferPool.getNumBuffers(), 0); + } + + private static int getNumberRequestedOrdinaryBuffers(LocalBufferPool bufferPool) { + return Math.min( + bufferPool.getNumBuffers(), bufferPool.getNumberOfRequestedMemorySegments()); + } + private int getNumRequestedFromMemorySegmentPool() { return networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments();