This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2d50c4777d2d5ae8e6f5857427832b7460d3893
Author: Weijie Guo <res...@163.com>
AuthorDate: Wed Jul 27 23:00:45 2022 +0800

    [FLINK-27908] HybridShuffleConfiguration supports set spilling strategy 
type.
---
 .../hybrid/HybridShuffleConfiguration.java         | 32 ++++++++++++++++++++--
 1 file changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
index 144e2ccea5a..18cadb71b05 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
@@ -36,12 +36,17 @@ public class HybridShuffleConfiguration {
 
     private static final float DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO = 
0.4f;
 
+    private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME =
+            SpillingStrategyType.FULL;
+
     private final int maxBuffersReadAhead;
 
     private final Duration bufferRequestTimeout;
 
     private final int maxRequestedBuffers;
 
+    private final SpillingStrategyType spillingStrategyType;
+
     // ----------------------------------------
     //        Selective Spilling Strategy
     // ----------------------------------------
@@ -66,7 +71,8 @@ public class HybridShuffleConfiguration {
             float selectiveStrategySpillBufferRatio,
             int fullStrategyNumBuffersTriggerSpilling,
             float fullStrategyReleaseThreshold,
-            float fullStrategyReleaseBufferRatio) {
+            float fullStrategyReleaseBufferRatio,
+            SpillingStrategyType spillingStrategyType) {
         this.maxBuffersReadAhead = maxBuffersReadAhead;
         this.bufferRequestTimeout = bufferRequestTimeout;
         this.maxRequestedBuffers = maxRequestedBuffers;
@@ -75,12 +81,18 @@ public class HybridShuffleConfiguration {
         this.fullStrategyNumBuffersTriggerSpilling = 
fullStrategyNumBuffersTriggerSpilling;
         this.fullStrategyReleaseThreshold = fullStrategyReleaseThreshold;
         this.fullStrategyReleaseBufferRatio = fullStrategyReleaseBufferRatio;
+        this.spillingStrategyType = spillingStrategyType;
     }
 
     public static Builder builder(int numSubpartitions, int 
numBuffersPerRequest) {
         return new Builder(numSubpartitions, numBuffersPerRequest);
     }
 
+    /** Get {@link SpillingStrategyType} for hybrid shuffle mode. */
+    public SpillingStrategyType getSpillingStrategyType() {
+        return spillingStrategyType;
+    }
+
     public int getMaxRequestedBuffers() {
         return maxRequestedBuffers;
     }
@@ -135,8 +147,14 @@ public class HybridShuffleConfiguration {
         return fullStrategyReleaseBufferRatio;
     }
 
+    /** Type of {@link HsSpillingStrategy}. */
+    public enum SpillingStrategyType {
+        FULL,
+        SELECTIVE
+    }
+
     /** Builder for {@link HybridShuffleConfiguration}. */
-    static class Builder {
+    public static class Builder {
         private int maxBuffersReadAhead = DEFAULT_MAX_BUFFERS_READ_AHEAD;
 
         private Duration bufferRequestTimeout = DEFAULT_BUFFER_REQUEST_TIMEOUT;
@@ -153,6 +171,8 @@ public class HybridShuffleConfiguration {
 
         private float fullStrategyReleaseBufferRatio = 
DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO;
 
+        private SpillingStrategyType spillingStrategyType = 
DEFAULT_SPILLING_STRATEGY_NAME;
+
         private final int numSubpartitions;
 
         private final int numBuffersPerRequest;
@@ -199,6 +219,11 @@ public class HybridShuffleConfiguration {
             return this;
         }
 
+        public Builder setSpillingStrategyType(SpillingStrategyType 
spillingStrategyType) {
+            this.spillingStrategyType = spillingStrategyType;
+            return this;
+        }
+
         public HybridShuffleConfiguration build() {
             return new HybridShuffleConfiguration(
                     maxBuffersReadAhead,
@@ -208,7 +233,8 @@ public class HybridShuffleConfiguration {
                     selectiveStrategySpillBufferRatio,
                     fullStrategyNumBuffersTriggerSpilling,
                     fullStrategyReleaseThreshold,
-                    fullStrategyReleaseBufferRatio);
+                    fullStrategyReleaseBufferRatio,
+                    spillingStrategyType);
         }
     }
 }

Reply via email to