This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f80d7a4428e26b319172a56e02976bcbf5707a4f Author: Weijie Guo <res...@163.com> AuthorDate: Fri Apr 21 17:46:02 2023 +0800 [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name. --- .../runtime/io/network/buffer/LocalBufferPool.java | 11 +++++++++- .../io/network/buffer/NetworkBufferPool.java | 18 ++++++++-------- .../network/metrics/NettyShuffleMetricFactory.java | 2 +- .../io/network/buffer/LocalBufferPoolTest.java | 5 +---- .../io/network/buffer/NetworkBufferPoolTest.java | 24 +++++++++++----------- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index e012c725582..190734c35b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -285,11 +285,13 @@ class LocalBufferPool implements BufferPool { } /** + * Estimates the number of requested buffers. + * * @return the same value as {@link #getMaxNumberOfMemorySegments()} for bounded pools. For * unbounded pools it returns an approximation based upon {@link * #getNumberOfRequiredMemorySegments()} */ - public int getNumberOfRequestedMemorySegments() { + public int getEstimatedNumberOfRequestedMemorySegments() { if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) { return maxNumberOfMemorySegments; } else { @@ -297,6 +299,13 @@ class LocalBufferPool implements BufferPool { } } + @VisibleForTesting + public int getNumberOfRequestedMemorySegments() { + synchronized (availableMemorySegments) { + return numberOfRequestedMemorySegments; + } + } + @VisibleForTesting public int getNumberOfRequestedOverdraftMemorySegments() { synchronized (availableMemorySegments) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 24ca78fd551..c14f89fe096 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -366,36 +366,38 @@ public class NetworkBufferPool } } - public long getNumberOfRequestedMemorySegments() { + public long getEstimatedNumberOfRequestedMemorySegments() { long requestedSegments = 0; synchronized (factoryLock) { for (LocalBufferPool bufferPool : allBufferPools) { - requestedSegments += bufferPool.getNumberOfRequestedMemorySegments(); + requestedSegments += bufferPool.getEstimatedNumberOfRequestedMemorySegments(); } } return requestedSegments; } - public long getRequestedMemory() { - return getNumberOfRequestedMemorySegments() * memorySegmentSize; + public long getEstimatedRequestedMemory() { + return getEstimatedNumberOfRequestedMemorySegments() * memorySegmentSize; } - public int getRequestedSegmentsUsage() { + public int getEstimatedRequestedSegmentsUsage() { int totalNumberOfMemorySegments = getTotalNumberOfMemorySegments(); return totalNumberOfMemorySegments == 0 ? 0 : Math.toIntExact( - 100L * getNumberOfRequestedMemorySegments() / totalNumberOfMemorySegments); + 100L + * getEstimatedNumberOfRequestedMemorySegments() + / totalNumberOfMemorySegments); } @VisibleForTesting Optional<String> getUsageWarning() { - int currentUsage = getRequestedSegmentsUsage(); + int currentUsage = getEstimatedRequestedSegmentsUsage(); Optional<String> message = Optional.empty(); // do not log warning if the value hasn't changed to avoid spamming warnings. if (currentUsage >= USAGE_WARNING_THRESHOLD && lastCheckedUsage != currentUsage) { long totalMemory = getTotalMemory(); - long requestedMemory = getRequestedMemory(); + long requestedMemory = getEstimatedRequestedMemory(); long missingMemory = requestedMemory - totalMemory; message = Optional.of( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java index 2aaffe2d681..817190732b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java @@ -244,7 +244,7 @@ public class NettyShuffleMetricFactory { @Override public Integer getValue() { - return networkBufferPool.getRequestedSegmentsUsage(); + return networkBufferPool.getEstimatedRequestedSegmentsUsage(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java index 6c0fcf13b47..4646fee2f49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java @@ -309,10 +309,7 @@ class LocalBufferPoolTest { assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()) .isEqualTo(numRequestedOverdraftBuffersAfterDecreasing); - assertThat( - bufferPool.bestEffortGetNumOfUsedBuffers() - + bufferPool.getNumberOfAvailableMemorySegments() - - bufferPool.getNumberOfRequestedOverdraftMemorySegments()) + assertThat(bufferPool.getNumberOfRequestedMemorySegments()) .isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing); assertThat(bufferPool.getNumberOfAvailableMemorySegments()) .isEqualTo(numAvailableBuffersAfterDecreasing); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java index 5a5bd186b3f..20fd5964b48 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java @@ -330,7 +330,7 @@ public class NetworkBufferPoolTest extends TestLogger { try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { NetworkBufferPool globalPool = new NetworkBufferPool(0, 128); closeableRegistry.registerCloseable(globalPool::destroy); - assertEquals(0, globalPool.getRequestedSegmentsUsage()); + assertEquals(0, globalPool.getEstimatedRequestedSegmentsUsage()); } } @@ -342,21 +342,21 @@ public class NetworkBufferPoolTest extends TestLogger { BufferPool bufferPool1 = globalPool.createBufferPool(10, 20); - assertEquals(20, globalPool.getNumberOfRequestedMemorySegments()); - assertEquals(40, globalPool.getRequestedSegmentsUsage()); + assertEquals(20, globalPool.getEstimatedNumberOfRequestedMemorySegments()); + assertEquals(40, globalPool.getEstimatedRequestedSegmentsUsage()); assertThat(globalPool.getUsageWarning(), equalTo(Optional.empty())); closeableRegistry.registerCloseable( (globalPool.createBufferPool(5, Integer.MAX_VALUE))::lazyDestroy); - assertEquals(30, globalPool.getNumberOfRequestedMemorySegments()); - assertEquals(60, globalPool.getRequestedSegmentsUsage()); + assertEquals(30, globalPool.getEstimatedNumberOfRequestedMemorySegments()); + assertEquals(60, globalPool.getEstimatedRequestedSegmentsUsage()); assertThat(globalPool.getUsageWarning(), equalTo(Optional.empty())); closeableRegistry.registerCloseable((globalPool.createBufferPool(10, 30))::lazyDestroy); - assertEquals(60, globalPool.getNumberOfRequestedMemorySegments()); - assertEquals(120, globalPool.getRequestedSegmentsUsage()); + assertEquals(60, globalPool.getEstimatedNumberOfRequestedMemorySegments()); + assertEquals(120, globalPool.getEstimatedRequestedSegmentsUsage()); assertThat( globalPool.getUsageWarning(), equalTo( @@ -372,8 +372,8 @@ public class NetworkBufferPoolTest extends TestLogger { BufferPool bufferPool2 = globalPool.createBufferPool(10, 20); - assertEquals(80, globalPool.getNumberOfRequestedMemorySegments()); - assertEquals(160, globalPool.getRequestedSegmentsUsage()); + assertEquals(80, globalPool.getEstimatedNumberOfRequestedMemorySegments()); + assertEquals(160, globalPool.getEstimatedRequestedSegmentsUsage()); assertThat( globalPool.getUsageWarning(), equalTo( @@ -389,9 +389,9 @@ public class NetworkBufferPoolTest extends TestLogger { bufferPool2.lazyDestroy(); bufferPool1.lazyDestroy(); - assertEquals(40, globalPool.getNumberOfRequestedMemorySegments()); - assertEquals(40 * 128, globalPool.getRequestedMemory()); - assertEquals(80, globalPool.getRequestedSegmentsUsage()); + assertEquals(40, globalPool.getEstimatedNumberOfRequestedMemorySegments()); + assertEquals(40 * 128, globalPool.getEstimatedRequestedMemory()); + assertEquals(80, globalPool.getEstimatedRequestedSegmentsUsage()); assertThat( globalPool.getUsageWarning(), equalTo(Optional.of("Memory usage [80%] went back to normal")));