This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new 790434b9fb7 [FLINK-29425] Hybrid full spilling strategy triggering spilling frequently 790434b9fb7 is described below commit 790434b9fb7296420c1ea15af0d640273776d0b0 Author: Weijie Guo <res...@163.com> AuthorDate: Tue Sep 27 15:24:30 2022 +0800 [FLINK-29425] Hybrid full spilling strategy triggering spilling frequently This closes #20904 --- .../partition/hybrid/HsFullSpillingStrategy.java | 32 +++++++------ .../partition/hybrid/HsMemoryDataManager.java | 39 ++++++++++++++-- .../partition/hybrid/HsResultPartition.java | 3 +- .../hybrid/HsSelectiveSpillingStrategy.java | 2 +- .../partition/hybrid/HsSpillingStrategy.java | 2 +- .../hybrid/HybridShuffleConfiguration.java | 53 +++++++++++++++------- .../hybrid/HsFullSpillingStrategyTest.java | 20 +++++--- .../partition/hybrid/HsMemoryDataManagerTest.java | 49 ++++++++++++++++++-- .../partition/hybrid/HsResultPartitionTest.java | 28 ------------ .../hybrid/HsSelectiveSpillingStrategyTest.java | 2 +- .../partition/hybrid/HsSubpartitionViewTest.java | 3 +- .../partition/hybrid/TestingSpillingStrategy.java | 14 +++--- 12 files changed, 164 insertions(+), 83 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java index cfc737d2efd..f0339ee152a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java @@ -32,25 +32,25 @@ import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStr /** A special implementation of {@link HsSpillingStrategy} that spilled all buffers to disk. */ public class HsFullSpillingStrategy implements HsSpillingStrategy { - private final int numBuffersTriggerSpilling; + private final float numBuffersTriggerSpillingRatio; private final float releaseBufferRatio; private final float releaseThreshold; public HsFullSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) { - this.numBuffersTriggerSpilling = - hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpilling(); + this.numBuffersTriggerSpillingRatio = + hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpillingRatio(); this.releaseThreshold = hybridShuffleConfiguration.getFullStrategyReleaseThreshold(); this.releaseBufferRatio = hybridShuffleConfiguration.getFullStrategyReleaseBufferRatio(); } // For the case of buffer finished, whenever the number of unSpillBuffers reaches - // numBuffersTriggerSpilling, make a decision based on global information. Otherwise, no need to - // take action. + // numBuffersTriggerSpillingRatio times currentPoolSize, make a decision based on global + // information. Otherwise, no need to take action. @Override - public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) { - return numTotalUnSpillBuffers < numBuffersTriggerSpilling + public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) { + return numTotalUnSpillBuffers < numBuffersTriggerSpillingRatio * currentPoolSize ? Optional.of(Decision.NO_ACTION) : Optional.empty(); } @@ -74,8 +74,11 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { @Override public Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoProvider) { Decision.Builder builder = Decision.builder(); - checkSpill(spillingInfoProvider, builder); - checkRelease(spillingInfoProvider, builder); + // Save the cost of lock, if pool size is changed between checkSpill and checkRelease, pool + // size checker will handle this inconsistency. + int poolSize = spillingInfoProvider.getPoolSize(); + checkSpill(spillingInfoProvider, poolSize, builder); + checkRelease(spillingInfoProvider, poolSize, builder); return builder.build(); } @@ -99,8 +102,10 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { return builder.build(); } - private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, Decision.Builder builder) { - if (spillingInfoProvider.getNumTotalUnSpillBuffers() < numBuffersTriggerSpilling) { + private void checkSpill( + HsSpillingInfoProvider spillingInfoProvider, int poolSize, Decision.Builder builder) { + if (spillingInfoProvider.getNumTotalUnSpillBuffers() + < numBuffersTriggerSpillingRatio * poolSize) { // In case situation changed since onBufferFinished() returns Optional#empty() return; } @@ -114,9 +119,8 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { } private void checkRelease( - HsSpillingInfoProvider spillingInfoProvider, Decision.Builder builder) { - if (spillingInfoProvider.getNumTotalRequestedBuffers() - < spillingInfoProvider.getPoolSize() * releaseThreshold) { + HsSpillingInfoProvider spillingInfoProvider, int poolSize, Decision.Builder builder) { + if (spillingInfoProvider.getNumTotalRequestedBuffers() < poolSize * releaseThreshold) { // In case situation changed since onMemoryUsageChanged() returns Optional#empty() return; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java index 22a9408a01f..4f45b491aa2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.SupplierWithException; @@ -40,6 +41,9 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -71,6 +75,17 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap = new ConcurrentHashMap<>(); + /** + * Currently, it is only used to regularly check the actual size of local buffer pool (the size + * will change dynamically due to the redistribution of network buffers). When the size of the + * buffer pool changes, it attempts to trigger the spilling strategy. + */ + private final ScheduledExecutorService poolSizeChecker = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("hybrid-shuffle-pool-size-checker-executor")); + + private final AtomicInteger poolSize; + public HsMemoryDataManager( int numSubpartitions, int bufferSize, @@ -78,7 +93,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex, Path dataFilePath, - BufferCompressor bufferCompressor) + BufferCompressor bufferCompressor, + long poolSizeCheckInterval) throws IOException { this.numSubpartitions = numSubpartitions; this.bufferPool = bufferPool; @@ -99,6 +115,22 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData bufferCompressor, this); } + + poolSize = new AtomicInteger(this.bufferPool.getNumBuffers()); + + if (poolSizeCheckInterval > 0) { + poolSizeChecker.scheduleAtFixedRate( + () -> { + int newSize = this.bufferPool.getNumBuffers(); + int oldSize = poolSize.getAndSet(newSize); + if (oldSize > newSize) { + callWithLock(() -> spillStrategy.decideActionWithGlobalInfo(this)); + } + }, + poolSizeCheckInterval, + poolSizeCheckInterval, + TimeUnit.MILLISECONDS); + } } // ------------------------------------ @@ -144,6 +176,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this)); handleDecision(Optional.of(decision)); spiller.close(); + poolSizeChecker.shutdown(); } /** @@ -168,7 +201,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData @Override public int getPoolSize() { - return bufferPool.getNumBuffers(); + return poolSize.get(); } @Override @@ -240,7 +273,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData @Override public void onBufferFinished() { Optional<Decision> decision = - spillStrategy.onBufferFinished(numUnSpillBuffers.incrementAndGet()); + spillStrategy.onBufferFinished(numUnSpillBuffers.incrementAndGet(), getPoolSize()); handleDecision(decision); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java index 5d537aa7466..9b75f1e3724 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java @@ -127,7 +127,8 @@ public class HsResultPartition extends ResultPartition { getSpillingStrategy(hybridShuffleConfiguration), dataIndex, dataFilePath, - bufferCompressor); + bufferCompressor, + hybridShuffleConfiguration.getBufferPoolSizeCheckIntervalMs()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java index dcd53393bdf..dc356cb20d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java @@ -45,7 +45,7 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy { // For the case of buffer finished, there is no need to take action for // HsSelectiveSpillingStrategy. @Override - public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) { + public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) { return Optional.of(Decision.NO_ACTION); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java index fb4a5ab58d5..6d3a15d427a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java @@ -51,7 +51,7 @@ public interface HsSpillingStrategy { * @return A {@link Decision} based on the provided information, or {@link Optional#empty()} if * the decision cannot be made, which indicates global information is needed. */ - Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers); + Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize); /** * Make a decision when a buffer is consumed. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java index 18cadb71b05..f3df0b2ab5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java @@ -30,12 +30,14 @@ public class HybridShuffleConfiguration { private static final float DEFAULT_SELECTIVE_STRATEGY_SPILL_BUFFER_RATIO = 0.4f; - private static final int DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED = 10; + private static final float DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED_RATIO = 0.5f; private static final float DEFAULT_FULL_STRATEGY_RELEASE_THRESHOLD = 0.7f; private static final float DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO = 0.4f; + private static final long DEFAULT_BUFFER_POLL_SIZE_CHECK_INTERVAL_MS = 1000; + private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME = SpillingStrategyType.FULL; @@ -57,31 +59,36 @@ public class HybridShuffleConfiguration { // ---------------------------------------- // Full Spilling Strategy // ---------------------------------------- - private final int fullStrategyNumBuffersTriggerSpilling; + private final float fullStrategyNumBuffersTriggerSpillingRatio; private final float fullStrategyReleaseThreshold; private final float fullStrategyReleaseBufferRatio; + private final long bufferPoolSizeCheckIntervalMs; + private HybridShuffleConfiguration( int maxBuffersReadAhead, Duration bufferRequestTimeout, int maxRequestedBuffers, float selectiveStrategySpillThreshold, float selectiveStrategySpillBufferRatio, - int fullStrategyNumBuffersTriggerSpilling, + float fullStrategyNumBuffersTriggerSpillingRatio, float fullStrategyReleaseThreshold, float fullStrategyReleaseBufferRatio, - SpillingStrategyType spillingStrategyType) { + SpillingStrategyType spillingStrategyType, + long bufferPoolSizeCheckIntervalMs) { this.maxBuffersReadAhead = maxBuffersReadAhead; this.bufferRequestTimeout = bufferRequestTimeout; this.maxRequestedBuffers = maxRequestedBuffers; this.selectiveStrategySpillThreshold = selectiveStrategySpillThreshold; this.selectiveStrategySpillBufferRatio = selectiveStrategySpillBufferRatio; - this.fullStrategyNumBuffersTriggerSpilling = fullStrategyNumBuffersTriggerSpilling; + this.fullStrategyNumBuffersTriggerSpillingRatio = + fullStrategyNumBuffersTriggerSpillingRatio; this.fullStrategyReleaseThreshold = fullStrategyReleaseThreshold; this.fullStrategyReleaseBufferRatio = fullStrategyReleaseBufferRatio; this.spillingStrategyType = spillingStrategyType; + this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs; } public static Builder builder(int numSubpartitions, int numBuffersPerRequest) { @@ -127,11 +134,11 @@ public class HybridShuffleConfiguration { } /** - * When the number of unSpilled buffers equal to this value, trigger the spilling operation. - * Used by {@link HsFullSpillingStrategy}. + * When the number of unSpilled buffers equal to this ratio times pool size, trigger the + * spilling operation. Used by {@link HsFullSpillingStrategy}. */ - public int getFullStrategyNumBuffersTriggerSpilling() { - return fullStrategyNumBuffersTriggerSpilling; + public float getFullStrategyNumBuffersTriggerSpillingRatio() { + return fullStrategyNumBuffersTriggerSpillingRatio; } /** @@ -147,6 +154,11 @@ public class HybridShuffleConfiguration { return fullStrategyReleaseBufferRatio; } + /** Check interval of buffer pool's size. */ + public long getBufferPoolSizeCheckIntervalMs() { + return bufferPoolSizeCheckIntervalMs; + } + /** Type of {@link HsSpillingStrategy}. */ public enum SpillingStrategyType { FULL, @@ -164,13 +176,15 @@ public class HybridShuffleConfiguration { private float selectiveStrategySpillBufferRatio = DEFAULT_SELECTIVE_STRATEGY_SPILL_BUFFER_RATIO; - private int fullStrategyNumBuffersTriggerSpilling = - DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED; + private float fullStrategyNumBuffersTriggerSpillingRatio = + DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED_RATIO; private float fullStrategyReleaseThreshold = DEFAULT_FULL_STRATEGY_RELEASE_THRESHOLD; private float fullStrategyReleaseBufferRatio = DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO; + private long bufferPoolSizeCheckIntervalMs = DEFAULT_BUFFER_POLL_SIZE_CHECK_INTERVAL_MS; + private SpillingStrategyType spillingStrategyType = DEFAULT_SPILLING_STRATEGY_NAME; private final int numSubpartitions; @@ -203,9 +217,10 @@ public class HybridShuffleConfiguration { return this; } - public Builder setFullStrategyNumBuffersTriggerSpilling( - int fullStrategyNumBuffersTriggerSpilling) { - this.fullStrategyNumBuffersTriggerSpilling = fullStrategyNumBuffersTriggerSpilling; + public Builder setFullStrategyNumBuffersTriggerSpillingRatio( + float fullStrategyNumBuffersTriggerSpillingRatio) { + this.fullStrategyNumBuffersTriggerSpillingRatio = + fullStrategyNumBuffersTriggerSpillingRatio; return this; } @@ -224,6 +239,11 @@ public class HybridShuffleConfiguration { return this; } + public Builder setBufferPoolSizeCheckIntervalMs(long bufferPoolSizeCheckIntervalMs) { + this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs; + return this; + } + public HybridShuffleConfiguration build() { return new HybridShuffleConfiguration( maxBuffersReadAhead, @@ -231,10 +251,11 @@ public class HybridShuffleConfiguration { Math.max(2 * numBuffersPerRequest, numSubpartitions), selectiveStrategySpillThreshold, selectiveStrategySpillBufferRatio, - fullStrategyNumBuffersTriggerSpilling, + fullStrategyNumBuffersTriggerSpillingRatio, fullStrategyReleaseThreshold, fullStrategyReleaseBufferRatio, - spillingStrategyType); + spillingStrategyType, + bufferPoolSizeCheckIntervalMs); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java index 44701cbab76..3602b15b2d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java @@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.entry; class HsFullSpillingStrategyTest { public static final int NUM_SUBPARTITIONS = 2; - public static final int NUM_BUFFERS_TRIGGER_SPILLING = 2; + public static final float NUM_BUFFERS_TRIGGER_SPILLING_RATIO = 0.2f; public static final float FULL_SPILL_RELEASE_THRESHOLD = 0.8f; @@ -47,24 +47,31 @@ class HsFullSpillingStrategyTest { private final HsSpillingStrategy spillStrategy = new HsFullSpillingStrategy( HybridShuffleConfiguration.builder(NUM_SUBPARTITIONS, 1) - .setFullStrategyNumBuffersTriggerSpilling(NUM_BUFFERS_TRIGGER_SPILLING) + .setFullStrategyNumBuffersTriggerSpillingRatio( + NUM_BUFFERS_TRIGGER_SPILLING_RATIO) .setFullStrategyReleaseThreshold(FULL_SPILL_RELEASE_THRESHOLD) .setFullStrategyReleaseBufferRatio(FULL_SPILL_RELEASE_RATIO) .build()); @Test void testOnBufferFinishedUnSpillBufferBelowThreshold() { + final int poolSize = 10; Optional<Decision> finishedDecision = - spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING - 1); + spillStrategy.onBufferFinished( + (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO) - 1, poolSize); assertThat(finishedDecision).hasValue(Decision.NO_ACTION); } @Test void testOnBufferFinishedUnSpillBufferEqualToOrGreatThenThreshold() { + final int poolSize = 10; Optional<Decision> finishedDecision = - spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING); + spillStrategy.onBufferFinished( + (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO), poolSize); assertThat(finishedDecision).isNotPresent(); - finishedDecision = spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING + 1); + finishedDecision = + spillStrategy.onBufferFinished( + (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO) + 1, poolSize); assertThat(finishedDecision).isNotPresent(); } @@ -125,7 +132,8 @@ class HsFullSpillingStrategyTest { .addConsumedBuffers(subpartition1, Arrays.asList(0, 1)) .addSpillBuffers(subpartition2, Arrays.asList(1, 2, 3)) .addConsumedBuffers(subpartition2, Arrays.asList(0, 1)) - .setGetNumTotalUnSpillBuffersSupplier(() -> NUM_BUFFERS_TRIGGER_SPILLING) + .setGetNumTotalUnSpillBuffersSupplier( + () -> (int) (10 * NUM_BUFFERS_TRIGGER_SPILLING_RATIO)) .setGetNumTotalRequestedBuffersSupplier(() -> 10) .setGetPoolSizeSupplier(() -> 10) .setGetNextBufferIndexToConsumeSupplier( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java index 4cf1bc88c94..773b847ecc1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java @@ -65,7 +65,7 @@ class HsMemoryDataManagerTest { HsSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder() .setOnBufferFinishedFunction( - (numTotalUnSpillBuffers) -> { + (numTotalUnSpillBuffers, currentPoolSize) -> { finishedBuffers.incrementAndGet(); return Optional.of(Decision.NO_ACTION); }) @@ -120,7 +120,7 @@ class HsMemoryDataManagerTest { HsSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder() .setOnBufferFinishedFunction( - (numFinishedBuffers) -> { + (numFinishedBuffers, poolSize) -> { if (numFinishedBuffers < numFinishedBufferToTriggerDecision) { return Optional.of(Decision.NO_ACTION); } @@ -160,7 +160,7 @@ class HsMemoryDataManagerTest { HsSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder() .setOnBufferFinishedFunction( - (finishedBuffer) -> { + (finishedBuffer, poolSize) -> { // return empty optional to trigger global decision. return Optional.empty(); }) @@ -192,6 +192,34 @@ class HsMemoryDataManagerTest { assertThat(resultPartitionReleaseFuture).isCompleted(); } + @Test + void testPoolSizeCheck() throws Exception { + final int requiredBuffers = 10; + final int maxBuffers = 100; + CompletableFuture<Void> triggerGlobalDecision = new CompletableFuture<>(); + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(maxBuffers, bufferSize); + BufferPool bufferPool = networkBufferPool.createBufferPool(requiredBuffers, maxBuffers); + assertThat(bufferPool.getNumBuffers()).isEqualTo(maxBuffers); + + HsSpillingStrategy spillingStrategy = + TestingSpillingStrategy.builder() + .setDecideActionWithGlobalInfoFunction( + (spillingInfoProvider) -> { + assertThat(spillingInfoProvider.getPoolSize()) + .isEqualTo(requiredBuffers); + triggerGlobalDecision.complete(null); + return Decision.NO_ACTION; + }) + .build(); + + createMemoryDataManager(spillingStrategy, bufferPool); + networkBufferPool.createBufferPool(maxBuffers - requiredBuffers, maxBuffers); + assertThat(bufferPool.getNumBuffers()).isEqualTo(requiredBuffers); + + assertThat(triggerGlobalDecision).succeedsWithin(10, TimeUnit.SECONDS); + } + private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy spillStrategy) throws Exception { return createMemoryDataManager(spillStrategy, new HsFileDataIndexImpl(NUM_SUBPARTITIONS)); @@ -201,6 +229,18 @@ class HsMemoryDataManagerTest { HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex) throws Exception { NetworkBufferPool networkBufferPool = new NetworkBufferPool(NUM_BUFFERS, bufferSize); BufferPool bufferPool = networkBufferPool.createBufferPool(poolSize, poolSize); + return createMemoryDataManager(bufferPool, spillStrategy, fileDataIndex); + } + + private HsMemoryDataManager createMemoryDataManager( + HsSpillingStrategy spillingStrategy, BufferPool bufferPool) throws Exception { + return createMemoryDataManager( + bufferPool, spillingStrategy, new HsFileDataIndexImpl(NUM_SUBPARTITIONS)); + } + + private HsMemoryDataManager createMemoryDataManager( + BufferPool bufferPool, HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex) + throws Exception { HsMemoryDataManager memoryDataManager = new HsMemoryDataManager( NUM_SUBPARTITIONS, @@ -209,7 +249,8 @@ class HsMemoryDataManagerTest { spillStrategy, fileDataIndex, dataFilePath, - null); + null, + 1000); memoryDataManager.setOutputMetrics(createTestingOutputMetrics()); return memoryDataManager; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java index c1086b95f7d..826a7b243f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultSubpartition; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; -import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration.SpillingStrategyType; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.IOUtils; @@ -380,33 +379,6 @@ class HsResultPartitionTest { return ByteBuffer.wrap(dataWritten); } - private HsResultPartition createHsResultPartition( - int numSubpartitions, BufferPool bufferPool, int numBuffersTriggerSpilling) - throws IOException { - HsResultPartition hsResultPartition = - new HsResultPartition( - "HsResultPartitionTest", - 0, - new ResultPartitionID(), - ResultPartitionType.HYBRID_FULL, - numSubpartitions, - numSubpartitions, - readBufferPool, - readIOExecutor, - new ResultPartitionManager(), - fileChannelManager.createChannel().getPath(), - bufferSize, - HybridShuffleConfiguration.builder( - numSubpartitions, readBufferPool.getNumBuffersPerRequest()) - .setSpillingStrategyType(SpillingStrategyType.FULL) - .setFullStrategyNumBuffersTriggerSpilling(numBuffersTriggerSpilling) - .build(), - null, - () -> bufferPool); - hsResultPartition.setup(); - return hsResultPartition; - } - private HsResultPartition createHsResultPartition(int numSubpartitions, BufferPool bufferPool) throws IOException { HsResultPartition hsResultPartition = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java index dfeb7a0f425..04328fbc32b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java @@ -49,7 +49,7 @@ class HsSelectiveSpillingStrategyTest { @Test void testOnBufferFinished() { - Optional<Decision> finishedDecision = spillStrategy.onBufferFinished(5); + Optional<Decision> finishedDecision = spillStrategy.onBufferFinished(5, 10); assertThat(finishedDecision).hasValue(Decision.NO_ACTION); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java index aedb0602c83..451ab8980da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java @@ -114,7 +114,8 @@ class HsSubpartitionViewTest { spillingStrategy, new HsFileDataIndexImpl(1), dataFilePath.resolve(".data"), - null); + null, + 0); memoryDataManager.setOutputMetrics(createTestingOutputMetrics()); HsDataView hsDataView = memoryDataManager.registerSubpartitionView(0, subpartitionView); subpartitionView.setMemoryDataView(hsDataView); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java index 61a730fc5c8..48fff99db7d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java @@ -26,7 +26,7 @@ import java.util.function.Function; public class TestingSpillingStrategy implements HsSpillingStrategy { private final BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction; - private final Function<Integer, Optional<Decision>> onBufferFinishedFunction; + private final BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction; private final Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction; @@ -36,7 +36,7 @@ public class TestingSpillingStrategy implements HsSpillingStrategy { private TestingSpillingStrategy( BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction, - Function<Integer, Optional<Decision>> onBufferFinishedFunction, + BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction, Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction, Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction, Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction) { @@ -54,8 +54,8 @@ public class TestingSpillingStrategy implements HsSpillingStrategy { } @Override - public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) { - return onBufferFinishedFunction.apply(numTotalUnSpillBuffers); + public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) { + return onBufferFinishedFunction.apply(numTotalUnSpillBuffers, currentPoolSize); } @Override @@ -82,8 +82,8 @@ public class TestingSpillingStrategy implements HsSpillingStrategy { private BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction = (ignore1, ignore2) -> Optional.of(Decision.NO_ACTION); - private Function<Integer, Optional<Decision>> onBufferFinishedFunction = - (ignore) -> Optional.of(Decision.NO_ACTION); + private BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction = + (ignore1, ignore2) -> Optional.of(Decision.NO_ACTION); private Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction = (ignore) -> Optional.of(Decision.NO_ACTION); @@ -103,7 +103,7 @@ public class TestingSpillingStrategy implements HsSpillingStrategy { } public Builder setOnBufferFinishedFunction( - Function<Integer, Optional<Decision>> onBufferFinishedFunction) { + BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction) { this.onBufferFinishedFunction = onBufferFinishedFunction; return this; }