This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e2d50c4777d2d5ae8e6f5857427832b7460d3893 Author: Weijie Guo <res...@163.com> AuthorDate: Wed Jul 27 23:00:45 2022 +0800 [FLINK-27908] HybridShuffleConfiguration supports set spilling strategy type. --- .../hybrid/HybridShuffleConfiguration.java | 32 ++++++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java index 144e2ccea5a..18cadb71b05 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java @@ -36,12 +36,17 @@ public class HybridShuffleConfiguration { private static final float DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO = 0.4f; + private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME = + SpillingStrategyType.FULL; + private final int maxBuffersReadAhead; private final Duration bufferRequestTimeout; private final int maxRequestedBuffers; + private final SpillingStrategyType spillingStrategyType; + // ---------------------------------------- // Selective Spilling Strategy // ---------------------------------------- @@ -66,7 +71,8 @@ public class HybridShuffleConfiguration { float selectiveStrategySpillBufferRatio, int fullStrategyNumBuffersTriggerSpilling, float fullStrategyReleaseThreshold, - float fullStrategyReleaseBufferRatio) { + float fullStrategyReleaseBufferRatio, + SpillingStrategyType spillingStrategyType) { this.maxBuffersReadAhead = maxBuffersReadAhead; this.bufferRequestTimeout = bufferRequestTimeout; this.maxRequestedBuffers = maxRequestedBuffers; @@ -75,12 +81,18 @@ public class HybridShuffleConfiguration { this.fullStrategyNumBuffersTriggerSpilling = fullStrategyNumBuffersTriggerSpilling; this.fullStrategyReleaseThreshold = fullStrategyReleaseThreshold; this.fullStrategyReleaseBufferRatio = fullStrategyReleaseBufferRatio; + this.spillingStrategyType = spillingStrategyType; } public static Builder builder(int numSubpartitions, int numBuffersPerRequest) { return new Builder(numSubpartitions, numBuffersPerRequest); } + /** Get {@link SpillingStrategyType} for hybrid shuffle mode. */ + public SpillingStrategyType getSpillingStrategyType() { + return spillingStrategyType; + } + public int getMaxRequestedBuffers() { return maxRequestedBuffers; } @@ -135,8 +147,14 @@ public class HybridShuffleConfiguration { return fullStrategyReleaseBufferRatio; } + /** Type of {@link HsSpillingStrategy}. */ + public enum SpillingStrategyType { + FULL, + SELECTIVE + } + /** Builder for {@link HybridShuffleConfiguration}. */ - static class Builder { + public static class Builder { private int maxBuffersReadAhead = DEFAULT_MAX_BUFFERS_READ_AHEAD; private Duration bufferRequestTimeout = DEFAULT_BUFFER_REQUEST_TIMEOUT; @@ -153,6 +171,8 @@ public class HybridShuffleConfiguration { private float fullStrategyReleaseBufferRatio = DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO; + private SpillingStrategyType spillingStrategyType = DEFAULT_SPILLING_STRATEGY_NAME; + private final int numSubpartitions; private final int numBuffersPerRequest; @@ -199,6 +219,11 @@ public class HybridShuffleConfiguration { return this; } + public Builder setSpillingStrategyType(SpillingStrategyType spillingStrategyType) { + this.spillingStrategyType = spillingStrategyType; + return this; + } + public HybridShuffleConfiguration build() { return new HybridShuffleConfiguration( maxBuffersReadAhead, @@ -208,7 +233,8 @@ public class HybridShuffleConfiguration { selectiveStrategySpillBufferRatio, fullStrategyNumBuffersTriggerSpilling, fullStrategyReleaseThreshold, - fullStrategyReleaseBufferRatio); + fullStrategyReleaseBufferRatio, + spillingStrategyType); } } }