This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ffc6f3bfabd [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased ffc6f3bfabd is described below commit ffc6f3bfabd22b49b08f027400c194a8e7c9c51a Author: Weijie Guo <res...@163.com> AuthorDate: Tue Apr 11 23:33:57 2023 +0800 [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased --- .../runtime/io/network/buffer/LocalBufferPool.java | 14 +++- .../io/network/buffer/LocalBufferPoolTest.java | 85 ++++++++++++---------- 2 files changed, 59 insertions(+), 40 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 87b8a11e2aa..6506ab9f942 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -49,7 +49,11 @@ import static org.apache.flink.util.concurrent.FutureUtils.assertNoException; * * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match - * its new size. + * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments + + * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In + * order to meet this requirement, when the size of this pool changes, + * numberOfRequestedMemorySegments and numberOfRequestedOverdraftMemorySegments can be converted to + * each other. * * <p>Availability is defined as returning a non-overdraft segment on a subsequent {@link * #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a non-blocking {@link @@ -671,13 +675,19 @@ class LocalBufferPool implements BufferPool { currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments); - // reset overdraft buffers + // If pool size increases, try to convert overdraft buffer to ordinary buffer. while (numberOfRequestedOverdraftMemorySegments > 0 && numberOfRequestedMemorySegments < currentPoolSize) { numberOfRequestedOverdraftMemorySegments--; numberOfRequestedMemorySegments++; } + // If pool size decreases, try to convert ordinary buffer to overdraft buffer. + while (numberOfRequestedMemorySegments > currentPoolSize) { + numberOfRequestedMemorySegments--; + numberOfRequestedOverdraftMemorySegments++; + } + returnExcessMemorySegments(); if (isDestroyed) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java index 956d55e5948..6c0fcf13b47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java @@ -38,10 +38,8 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -255,9 +253,38 @@ class LocalBufferPoolTest { void testDecreasePoolSize() throws Exception { final int maxMemorySegments = 10; final int requiredMemorySegments = 4; - final int maxOverdraftBuffers = 2; - final int largePoolSize = 5; - final int smallPoolSize = 4; + + // requested buffers is equal to small pool size. + testDecreasePoolSizeInternal( + maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0); + // requested buffers is less than small pool size. + testDecreasePoolSizeInternal( + maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1); + // exceed buffers is equal to maxOverdraftBuffers + testDecreasePoolSizeInternal( + maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0); + // exceed buffers is greater than maxOverdraftBuffers + testDecreasePoolSizeInternal( + maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0); + // exceed buffers is less than maxOverdraftBuffers + testDecreasePoolSizeInternal( + maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0); + // decrease pool size with overdraft buffer. + testDecreasePoolSizeInternal( + maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0); + } + + void testDecreasePoolSizeInternal( + int maxMemorySegments, + int requiredMemorySegments, + int largePoolSize, + int smallPoolSize, + int maxOverdraftBuffers, + int numBuffersToRequest, + int numRequestedOverdraftBuffersAfterDecreasing, + int numRequestedOrdinaryBuffersAfterDecreasing, + int numAvailableBuffersAfterDecreasing) + throws Exception { LocalBufferPool bufferPool = new LocalBufferPool( networkBufferPool, @@ -266,51 +293,33 @@ class LocalBufferPoolTest { 0, Integer.MAX_VALUE, maxOverdraftBuffers); - Queue<MemorySegment> buffers = new LinkedList<>(); + List<MemorySegment> buffers = new ArrayList<>(); // set a larger pool size. bufferPool.setNumBuffers(largePoolSize); assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); - // request all buffer. - for (int i = 0; i < largePoolSize; i++) { + // request buffers. + for (int i = 0; i < numBuffersToRequest; i++) { buffers.add(bufferPool.requestMemorySegmentBlocking()); } - assertThat(bufferPool.isAvailable()).isFalse(); - - // request 1 overdraft buffers. - buffers.add(bufferPool.requestMemorySegmentBlocking()); - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); - assertThat(bufferPool.isAvailable()).isFalse(); // set a small pool size. bufferPool.setNumBuffers(smallPoolSize); assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); - assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); - assertThat(bufferPool.isAvailable()).isFalse(); - buffers.add(bufferPool.requestMemorySegmentBlocking()); - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); - assertThat(bufferPool.isAvailable()).isFalse(); - - // return all overdraft buffers. - bufferPool.recycle(buffers.poll()); - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); - assertThat(bufferPool.isAvailable()).isFalse(); - bufferPool.recycle(buffers.poll()); - assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); - assertThat(bufferPool.isAvailable()).isFalse(); - - // return the excess buffer. - bufferPool.recycle(buffers.poll()); - assertThat(bufferPool.isAvailable()).isFalse(); - // return non-excess buffers. - bufferPool.recycle(buffers.poll()); - assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); - assertThat(bufferPool.isAvailable()).isTrue(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()) + .isEqualTo(numRequestedOverdraftBuffersAfterDecreasing); + assertThat( + bufferPool.bestEffortGetNumOfUsedBuffers() + + bufferPool.getNumberOfAvailableMemorySegments() + - bufferPool.getNumberOfRequestedOverdraftMemorySegments()) + .isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()) + .isEqualTo(numAvailableBuffersAfterDecreasing); + assertThat(bufferPool.isAvailable()).isEqualTo(numAvailableBuffersAfterDecreasing > 0); - while (!buffers.isEmpty()) { - bufferPool.recycle(buffers.poll()); + for (MemorySegment buffer : buffers) { + bufferPool.recycle(buffer); } bufferPool.lazyDestroy(); }