This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 11ba5b3b26e146d80e1319d77d1ee5ef2dd261d1
Author: kevin.cyj <kevin....@alibaba-inc.com>
AuthorDate: Mon Oct 12 20:45:49 2020 +0800

    [FLINK-19602][network] Introduce new config options to enable sort-merge 
based blocking shuffle
    
    Two new config options are added to control the behavior of the sort-merge 
based blocking shuffle:
    
    1. taskmanager.network.sort-shuffle.min-buffers: Minimum number of network 
buffers required per sort-merge blocking result partition.
    2. taskmanager.network.sort-shuffle.min-parallelism: Parallelism threshold 
to switch between sort-merge blocking shuffle and the default hash-based 
blocking shuffle.
    
    With these new config options, the default behavior of blocking shuffle 
stays unchanged.
---
 .../generated/all_taskmanager_network_section.html |  12 ++
 .../netty_shuffle_environment_configuration.html   |  12 ++
 docs/ops/memory/mem_tuning.md                      |  20 ++++
 docs/ops/memory/mem_tuning.zh.md                   |  17 +++
 .../NettyShuffleEnvironmentOptions.java            |  30 +++++
 .../io/network/NettyShuffleServiceFactory.java     |   4 +-
 .../network/partition/ResultPartitionFactory.java  |  68 ++++++++----
 .../NettyShuffleEnvironmentConfiguration.java      |  33 +++++-
 .../io/network/NettyShuffleEnvironmentBuilder.java |  18 ++-
 .../network/partition/ResultPartitionBuilder.java  |  22 +++-
 .../partition/ResultPartitionFactoryTest.java      |  29 ++++-
 .../flink/test/runtime/BlockingShuffleITCase.java  | 123 +++++++++++++++++++++
 .../flink/test/runtime/JobGraphRunningUtil.java    |  62 +++++++++++
 .../test/runtime/ShuffleCompressionITCase.java     |  33 +-----
 14 files changed, 423 insertions(+), 60 deletions(-)

diff --git a/docs/_includes/generated/all_taskmanager_network_section.html 
b/docs/_includes/generated/all_taskmanager_network_section.html
index 6c2a11b..5e5336d 100644
--- a/docs/_includes/generated/all_taskmanager_network_section.html
+++ b/docs/_includes/generated/all_taskmanager_network_section.html
@@ -104,5 +104,17 @@
             <td>Integer</td>
             <td>The number of retry attempts for network communication. 
Currently it's only used for establishing input/output channel connections</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.sort-shuffle.min-buffers</h5></td>
+            <td style="word-wrap: break-word;">64</td>
+            <td>Integer</td>
+            <td>Minimum number of network buffers required per sort-merge 
blocking result partition. For large scale batch jobs, it is suggested to 
increase this config value to improve compression ratio and reduce small 
network packets. Note: to increase this config value, you may also need to 
increase the size of total network memory to avoid "insufficient number of 
network buffers" error.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
+            <td style="word-wrap: break-word;">2147483647</td>
+            <td>Integer</td>
+            <td>Parallelism threshold to switch between sort-merge blocking 
shuffle and the default hash-based blocking shuffle, which means for small 
parallelism, hash-based blocking shuffle will be used and for large 
parallelism, sort-merge blocking shuffle will be used. Note: sort-merge 
blocking shuffle uses unmanaged direct memory for shuffle data writing and 
reading so just increase the size of direct memory if direct memory OOM error 
occurs.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/docs/_includes/generated/netty_shuffle_environment_configuration.html 
b/docs/_includes/generated/netty_shuffle_environment_configuration.html
index 018296a..9d54dca 100644
--- a/docs/_includes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/_includes/generated/netty_shuffle_environment_configuration.html
@@ -122,5 +122,17 @@
             <td>Integer</td>
             <td>The number of retry attempts for network communication. 
Currently it's only used for establishing input/output channel connections</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.sort-shuffle.min-buffers</h5></td>
+            <td style="word-wrap: break-word;">64</td>
+            <td>Integer</td>
+            <td>Minimum number of network buffers required per sort-merge 
blocking result partition. For large scale batch jobs, it is suggested to 
increase this config value to improve compression ratio and reduce small 
network packets. Note: to increase this config value, you may also need to 
increase the size of total network memory to avoid "insufficient number of 
network buffers" error.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
+            <td style="word-wrap: break-word;">2147483647</td>
+            <td>Integer</td>
+            <td>Parallelism threshold to switch between sort-merge blocking 
shuffle and the default hash-based blocking shuffle, which means for small 
parallelism, hash-based blocking shuffle will be used and for large 
parallelism, sort-merge blocking shuffle will be used. Note: sort-merge 
blocking shuffle uses unmanaged direct memory for shuffle data writing and 
reading so just increase the size of direct memory if direct memory OOM error 
occurs.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/docs/ops/memory/mem_tuning.md b/docs/ops/memory/mem_tuning.md
index bbb210c..2cbbdbc 100644
--- a/docs/ops/memory/mem_tuning.md
+++ b/docs/ops/memory/mem_tuning.md
@@ -88,3 +88,23 @@ on the performance of your applications. Flink will attempt 
to allocate and use
 as configured for batch jobs but not go beyond its limits. This prevents 
`OutOfMemoryError`'s because Flink knows precisely
 how much memory it has to leverage. If the [managed 
memory](../memory/mem_setup_tm.html#managed-memory) is not sufficient,
 Flink will gracefully spill to disk.
+
+## Configure memory for sort-merge blocking shuffle
+
+The number of required network buffers per sort-merge blocking result 
partition is controlled by 
+[taskmanager.network.sort-shuffle.min-buffers](../config.html#taskmanager-network-sort-shuffle-min-buffers)
+and the default value is 64 which is quite small. Though it can work for 
arbitrary parallelism, the 
+performance may not be the best. For large scale jobs, it is suggested to 
increase this config value 
+to improve compression ratio and reduce small network packets which is good 
for performance. To increase 
+this value, you may also need to increase the size of total network memory by 
adjusting the config 
+values of 
[taskmanager.memory.network.fraction](../config.html#taskmanager-memory-network-fraction),
+[taskmanager.memory.network.min](../config.html#taskmanager-memory-network-min)
 and [taskmanager.
+memory.network.max](../config.html#taskmanager-memory-network-max) to avoid 
`insufficient number of 
+network buffers` error.
+
+Except for network memory, the sort-merge blocking shuffle implementation also 
uses some unmanaged 
+direct memory for shuffle data writing and reading. So to use sort-merge 
blocking shuffle, you may 
+need to reserve some direct memory for it by increasing the config value of 
[taskmanager.memory.task
+.off-heap.size](../config.html#taskmanager-memory-task-off-heap-size). If 
direct memory OOM error 
+occurs after you enable the sort-merge blocking shuffle, you can just give 
more direct memory until 
+the OOM error disappears.
diff --git a/docs/ops/memory/mem_tuning.zh.md b/docs/ops/memory/mem_tuning.zh.md
index e4f8c99..1533012 100644
--- a/docs/ops/memory/mem_tuning.zh.md
+++ b/docs/ops/memory/mem_tuning.zh.md
@@ -85,3 +85,20 @@ Flink 批处理算子使用[托管内存](../memory/mem_setup_tm.html#managed-me
 因此 Flink 
会在不超过其配置限额的前提下,尽可能分配更多的[托管内存](../memory/mem_setup_tm.html#managed-memory)。
 Flink 明确知道可以使用的内存大小,因此可以有效避免 `OutOfMemoryError` 的发生。
 当[托管内存](../memory/mem_setup_tm.html#managed-memory)不足时,Flink 会优雅地将数据落盘。
+
+## SortMerge数据Shuffle内存配置
+
+对于SortMerge数据Shuffle,每个ResultPartition需要的网络缓冲区(Buffer)数目是由[taskmanager.network.sort-
+shuffle.min-buffers](../config.html#taskmanager-network-sort-shuffle-min-buffers)这个配置决定的。它的
+默认值是64,是比较小的。虽然64个网络Buffer已经可以支持任意规模的并发,但性能可能不是最好的。对于大并发的作业,通
+过增大这个配置值,可以提高落盘数据的压缩率并且减少网络小包的数量,从而有利于提高Shuffle性能。为了增大这个配置值,
+你可能需要通过调整[taskmanager.memory.network.fraction](../config.html#taskmanager-memory-network-fraction),
+[taskmanager.memory.network.min](../config.html#taskmanager-memory-network-min)和[taskmanager.memory
+.network.max](../config.html#taskmanager-memory-network-max)这三个参数来增大总的网络内存大小从而避免出现
+`insufficient number of network buffers`错误。
+
+除了网络内存,SortMerge数据Shuffle还需要使用一些JVM Direct Memory来进行Shuffle数据的写出与读取。所以,为了使
+用SortMerge数据Shuffle你可能还需要通过增大这个配置值[taskmanager.memory.task.off-heap.size
+](../config.html#taskmanager-memory-task-off-heap-size)来为其来预留一些JVM Direct 
Memory。如果在你开启
+SortMerge数据Shuffle之后出现了Direct Memory OOM的错误,你只需要继续加大上面的配置值来预留更多的Direct Memory
+直到不再发生Direct Memory OOM的错误为止。
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 584bc6e..f8c71c6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -174,6 +174,36 @@ public class NettyShuffleEnvironmentOptions {
                                " increased in case of higher round trip times 
between nodes and/or larger number of machines in the cluster.");
 
        /**
+        * Minimum number of network buffers required per sort-merge blocking 
result partition.
+        */
+       @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+       public static final ConfigOption<Integer> 
NETWORK_SORT_SHUFFLE_MIN_BUFFERS =
+               key("taskmanager.network.sort-shuffle.min-buffers")
+                       .intType()
+                       .defaultValue(64)
+                       .withDescription("Minimum number of network buffers 
required per sort-merge blocking "
+                               + "result partition. For large scale batch 
jobs, it is suggested to increase this"
+                               + " config value to improve compression ratio 
and reduce small network packets. "
+                               + "Note: to increase this config value, you may 
also need to increase the size of "
+                               + "total network memory to avoid \"insufficient 
number of network buffers\" error.");
+
+       /**
+        * Parallelism threshold to switch between sort-merge based blocking 
shuffle and the default
+        * hash-based blocking shuffle.
+        */
+       @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+       public static final ConfigOption<Integer> 
NETWORK_SORT_SHUFFLE_MIN_PARALLELISM =
+               key("taskmanager.network.sort-shuffle.min-parallelism")
+                       .intType()
+                       .defaultValue(Integer.MAX_VALUE)
+                       .withDescription("Parallelism threshold to switch 
between sort-merge blocking shuffle "
+                               + "and the default hash-based blocking shuffle, 
which means for small parallelism,"
+                               + " hash-based blocking shuffle will be used 
and for large parallelism, sort-merge"
+                               + " blocking shuffle will be used. Note: 
sort-merge blocking shuffle uses unmanaged"
+                               + " direct memory for shuffle data writing and 
reading so just increase the size of"
+                               + " direct memory if direct memory OOM error 
occurs.");
+
+       /**
         * Number of max buffers can be used for each output subparition.
         */
        @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index d5863d7..ecd8027 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -126,7 +126,9 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        config.networkBufferSize(),
                        config.isBlockingShuffleCompressionEnabled(),
                        config.getCompressionCodec(),
-                       config.getMaxBuffersPerChannel());
+                       config.getMaxBuffersPerChannel(),
+                       config.sortShuffleMinBuffers(),
+                       config.sortShuffleMinParallelism());
 
                SingleInputGateFactory singleInputGateFactory = new 
SingleInputGateFactory(
                        taskExecutorResourceId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index c6a3a4e..c83eaff 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -64,6 +64,10 @@ public class ResultPartitionFactory {
 
        private final int maxBuffersPerChannel;
 
+       private final int sortShuffleMinBuffers;
+
+       private final int sortShuffleMinParallelism;
+
        public ResultPartitionFactory(
                ResultPartitionManager partitionManager,
                FileChannelManager channelManager,
@@ -74,7 +78,9 @@ public class ResultPartitionFactory {
                int networkBufferSize,
                boolean blockingShuffleCompressionEnabled,
                String compressionCodec,
-               int maxBuffersPerChannel) {
+               int maxBuffersPerChannel,
+               int sortShuffleMinBuffers,
+               int sortShuffleMinParallelism) {
 
                this.partitionManager = partitionManager;
                this.channelManager = channelManager;
@@ -86,6 +92,8 @@ public class ResultPartitionFactory {
                this.blockingShuffleCompressionEnabled = 
blockingShuffleCompressionEnabled;
                this.compressionCodec = compressionCodec;
                this.maxBuffersPerChannel = maxBuffersPerChannel;
+               this.sortShuffleMinBuffers = sortShuffleMinBuffers;
+               this.sortShuffleMinParallelism = sortShuffleMinParallelism;
        }
 
        public ResultPartition create(
@@ -146,25 +154,40 @@ public class ResultPartitionFactory {
                        partition = pipelinedPartition;
                }
                else if (type == ResultPartitionType.BLOCKING || type == 
ResultPartitionType.BLOCKING_PERSISTENT) {
-                       final BoundedBlockingResultPartition blockingPartition 
= new BoundedBlockingResultPartition(
-                               taskNameWithSubtaskAndId,
-                               partitionIndex,
-                               id,
-                               type,
-                               subpartitions,
-                               maxParallelism,
-                               partitionManager,
-                               bufferCompressor,
-                               bufferPoolFactory);
-
-                       initializeBoundedBlockingPartitions(
-                               subpartitions,
-                               blockingPartition,
-                               blockingSubpartitionType,
-                               networkBufferSize,
-                               channelManager);
-
-                       partition = blockingPartition;
+                       if (numberOfSubpartitions >= sortShuffleMinParallelism) 
{
+                               partition = new SortMergeResultPartition(
+                                       taskNameWithSubtaskAndId,
+                                       partitionIndex,
+                                       id,
+                                       type,
+                                       subpartitions.length,
+                                       maxParallelism,
+                                       networkBufferSize,
+                                       partitionManager,
+                                       
channelManager.createChannel().getPath(),
+                                       bufferCompressor,
+                                       bufferPoolFactory);
+                       } else {
+                               final BoundedBlockingResultPartition 
blockingPartition = new BoundedBlockingResultPartition(
+                                       taskNameWithSubtaskAndId,
+                                       partitionIndex,
+                                       id,
+                                       type,
+                                       subpartitions,
+                                       maxParallelism,
+                                       partitionManager,
+                                       bufferCompressor,
+                                       bufferPoolFactory);
+
+                               initializeBoundedBlockingPartitions(
+                                       subpartitions,
+                                       blockingPartition,
+                                       blockingSubpartitionType,
+                                       networkBufferSize,
+                                       channelManager);
+
+                               partition = blockingPartition;
+                       }
                }
                else {
                        throw new IllegalArgumentException("Unrecognized 
ResultPartitionType: " + type);
@@ -224,10 +247,13 @@ public class ResultPartitionFactory {
                return () -> {
                        int maxNumberOfMemorySegments = type.isBounded() ?
                                numberOfSubpartitions * 
networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+                       int numRequiredBuffers = !type.isPipelined() && 
numberOfSubpartitions >= sortShuffleMinParallelism ?
+                               sortShuffleMinBuffers : numberOfSubpartitions + 
1;
+
                        // If the partition type is back pressure-free, we 
register with the buffer pool for
                        // callbacks to release memory.
                        return bufferPoolFactory.createBufferPool(
-                               numberOfSubpartitions + 1,
+                               numRequiredBuffers,
                                maxNumberOfMemorySegments,
                                numberOfSubpartitions,
                                maxBuffersPerChannel);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 09505dc..8f32660 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -58,6 +58,10 @@ public class NettyShuffleEnvironmentConfiguration {
        /** Number of extra network buffers to use for each outgoing/incoming 
gate (result partition/input gate). */
        private final int floatingNetworkBuffersPerGate;
 
+       private final int sortShuffleMinBuffers;
+
+       private final int sortShuffleMinParallelism;
+
        private final Duration requestSegmentsTimeout;
 
        private final boolean isNetworkDetailedMetrics;
@@ -88,7 +92,9 @@ public class NettyShuffleEnvironmentConfiguration {
                        BoundedBlockingSubpartitionType 
blockingSubpartitionType,
                        boolean blockingShuffleCompressionEnabled,
                        String compressionCodec,
-                       int maxBuffersPerChannel) {
+                       int maxBuffersPerChannel,
+                       int sortShuffleMinBuffers,
+                       int sortShuffleMinParallelism) {
 
                this.numNetworkBuffers = numNetworkBuffers;
                this.networkBufferSize = networkBufferSize;
@@ -104,6 +110,8 @@ public class NettyShuffleEnvironmentConfiguration {
                this.blockingShuffleCompressionEnabled = 
blockingShuffleCompressionEnabled;
                this.compressionCodec = 
Preconditions.checkNotNull(compressionCodec);
                this.maxBuffersPerChannel = maxBuffersPerChannel;
+               this.sortShuffleMinBuffers = sortShuffleMinBuffers;
+               this.sortShuffleMinParallelism = sortShuffleMinParallelism;
        }
 
        // 
------------------------------------------------------------------------
@@ -132,6 +140,14 @@ public class NettyShuffleEnvironmentConfiguration {
                return floatingNetworkBuffersPerGate;
        }
 
+       public int sortShuffleMinBuffers() {
+               return sortShuffleMinBuffers;
+       }
+
+       public int sortShuffleMinParallelism() {
+               return sortShuffleMinParallelism;
+       }
+
        public Duration getRequestSegmentsTimeout() {
                return requestSegmentsTimeout;
        }
@@ -201,6 +217,11 @@ public class NettyShuffleEnvironmentConfiguration {
 
                int maxBuffersPerChannel = 
configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_MAX_BUFFERS_PER_CHANNEL);
 
+               int sortShuffleMinBuffers = configuration.getInteger(
+                       
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
+               int sortShuffleMinParallelism = configuration.getInteger(
+                       
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM);
+
                boolean isNetworkDetailedMetrics = 
configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
 
                String[] tempDirs = 
ConfigurationUtils.parseTempDirectories(configuration);
@@ -228,7 +249,9 @@ public class NettyShuffleEnvironmentConfiguration {
                        blockingSubpartitionType,
                        blockingShuffleCompressionEnabled,
                        compressionCodec,
-                       maxBuffersPerChannel);
+                       maxBuffersPerChannel,
+                       sortShuffleMinBuffers,
+                       sortShuffleMinParallelism);
        }
 
        /**
@@ -350,6 +373,8 @@ public class NettyShuffleEnvironmentConfiguration {
                result = 31 * result + (blockingShuffleCompressionEnabled ? 1 : 
0);
                result = 31 * result + Objects.hashCode(compressionCodec);
                result = 31 * result + maxBuffersPerChannel;
+               result = 31 * result + sortShuffleMinBuffers;
+               result = 31 * result + sortShuffleMinParallelism;
                return result;
        }
 
@@ -370,6 +395,8 @@ public class NettyShuffleEnvironmentConfiguration {
                                        this.partitionRequestMaxBackoff == 
that.partitionRequestMaxBackoff &&
                                        this.networkBuffersPerChannel == 
that.networkBuffersPerChannel &&
                                        this.floatingNetworkBuffersPerGate == 
that.floatingNetworkBuffersPerGate &&
+                                       this.sortShuffleMinBuffers == 
that.sortShuffleMinBuffers &&
+                                       this.sortShuffleMinParallelism == 
that.sortShuffleMinParallelism &&
                                        
this.requestSegmentsTimeout.equals(that.requestSegmentsTimeout) &&
                                        (nettyConfig != null ? 
nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
                                        Arrays.equals(this.tempDirs, 
that.tempDirs) &&
@@ -394,6 +421,8 @@ public class NettyShuffleEnvironmentConfiguration {
                                ", blockingShuffleCompressionEnabled=" + 
blockingShuffleCompressionEnabled +
                                ", compressionCodec=" + compressionCodec +
                                ", maxBuffersPerChannel=" + 
maxBuffersPerChannel +
+                               ", sortShuffleMinBuffers=" + 
sortShuffleMinBuffers +
+                               ", sortShuffleMinParallelism=" + 
sortShuffleMinParallelism +
                                '}';
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 1d625fc..08d2481d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -54,6 +54,10 @@ public class NettyShuffleEnvironmentBuilder {
 
        private int floatingNetworkBuffersPerGate = 8;
 
+       private int sortShuffleMinBuffers = 100;
+
+       private int sortShuffleMinParallelism = Integer.MAX_VALUE;
+
        private int maxBuffersPerChannel = Integer.MAX_VALUE;
 
        private boolean blockingShuffleCompressionEnabled = false;
@@ -110,6 +114,16 @@ public class NettyShuffleEnvironmentBuilder {
                return this;
        }
 
+       public NettyShuffleEnvironmentBuilder setSortShuffleMinBuffers(int 
sortShuffleMinBuffers) {
+               this.sortShuffleMinBuffers = sortShuffleMinBuffers;
+               return this;
+       }
+
+       public NettyShuffleEnvironmentBuilder setSortShuffleMinParallelism(int 
sortShuffleMinParallelism) {
+               this.sortShuffleMinParallelism = sortShuffleMinParallelism;
+               return this;
+       }
+
        public NettyShuffleEnvironmentBuilder 
setBlockingShuffleCompressionEnabled(boolean blockingShuffleCompressionEnabled) 
{
                this.blockingShuffleCompressionEnabled = 
blockingShuffleCompressionEnabled;
                return this;
@@ -156,7 +170,9 @@ public class NettyShuffleEnvironmentBuilder {
                                BoundedBlockingSubpartitionType.AUTO,
                                blockingShuffleCompressionEnabled,
                                compressionCodec,
-                               maxBuffersPerChannel),
+                               maxBuffersPerChannel,
+                               sortShuffleMinBuffers,
+                               sortShuffleMinParallelism),
                        taskManagerLocation,
                        new TaskEventDispatcher(),
                        resultPartitionManager,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 826df0b..67e7adb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -55,6 +55,10 @@ public class ResultPartitionBuilder {
 
        private int floatingNetworkBuffersPerGate = 1;
 
+       private int sortShuffleMinBuffers = 100;
+
+       private int sortShuffleMinParallelism = Integer.MAX_VALUE;
+
        private int maxBuffersPerChannel = Integer.MAX_VALUE;
 
        private int networkBufferSize = 1;
@@ -105,7 +109,9 @@ public class ResultPartitionBuilder {
                return 
setNetworkBuffersPerChannel(environment.getConfiguration().networkBuffersPerChannel())
                        
.setFloatingNetworkBuffersPerGate(environment.getConfiguration().floatingNetworkBuffersPerGate())
                        
.setNetworkBufferSize(environment.getConfiguration().networkBufferSize())
-                       
.setNetworkBufferPool(environment.getNetworkBufferPool());
+                       
.setNetworkBufferPool(environment.getNetworkBufferPool())
+                       
.setSortShuffleMinBuffers(environment.getConfiguration().sortShuffleMinBuffers())
+                       
.setSortShuffleMinParallelism(environment.getConfiguration().sortShuffleMinParallelism());
        }
 
        public ResultPartitionBuilder setNetworkBufferPool(NetworkBufferPool 
networkBufferPool) {
@@ -139,6 +145,16 @@ public class ResultPartitionBuilder {
                return this;
        }
 
+       public ResultPartitionBuilder setSortShuffleMinBuffers(int 
sortShuffleMinBuffers) {
+               this.sortShuffleMinBuffers = sortShuffleMinBuffers;
+               return this;
+       }
+
+       public ResultPartitionBuilder setSortShuffleMinParallelism(int 
sortShuffleMinParallelism) {
+               this.sortShuffleMinParallelism = sortShuffleMinParallelism;
+               return this;
+       }
+
        public ResultPartitionBuilder setCompressionCodec(String 
compressionCodec) {
                this.compressionCodec = compressionCodec;
                return this;
@@ -161,7 +177,9 @@ public class ResultPartitionBuilder {
                        networkBufferSize,
                        blockingShuffleCompressionEnabled,
                        compressionCodec,
-                       maxBuffersPerChannel);
+                       maxBuffersPerChannel,
+                       sortShuffleMinBuffers,
+                       sortShuffleMinParallelism);
 
                SupplierWithException<BufferPool, IOException> factory = 
bufferPoolFactory.orElseGet(() ->
                        
resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, 
partitionType));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 903878b..40297f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -72,7 +72,13 @@ public class ResultPartitionFactoryTest extends TestLogger {
        }
 
        @Test
-       public void testConsumptionOnReleaseForPipelined() {
+       public void testSortMergePartitionCreated() {
+               ResultPartition resultPartition = 
createResultPartition(ResultPartitionType.BLOCKING, 1);
+               assertTrue(resultPartition instanceof SortMergeResultPartition);
+       }
+
+       @Test
+       public void testReleaseOnConsumptionForPipelinedPartition() {
                final ResultPartition resultPartition = 
createResultPartition(ResultPartitionType.PIPELINED);
 
                resultPartition.onConsumedSubpartition(0);
@@ -81,7 +87,7 @@ public class ResultPartitionFactoryTest extends TestLogger {
        }
 
        @Test
-       public void testNoConsumptionOnReleaseForBlocking() {
+       public void testNoReleaseOnConsumptionForBoundedBlockingPartition() {
                final ResultPartition resultPartition = 
createResultPartition(ResultPartitionType.BLOCKING);
 
                resultPartition.onConsumedSubpartition(0);
@@ -89,7 +95,22 @@ public class ResultPartitionFactoryTest extends TestLogger {
                assertFalse(resultPartition.isReleased());
        }
 
+       @Test
+       public void testNoReleaseOnConsumptionForSortMergePartition() {
+               final ResultPartition resultPartition = 
createResultPartition(ResultPartitionType.BLOCKING, 1);
+
+               resultPartition.onConsumedSubpartition(0);
+
+               assertFalse(resultPartition.isReleased());
+       }
+
        private static ResultPartition 
createResultPartition(ResultPartitionType partitionType) {
+               return createResultPartition(partitionType, Integer.MAX_VALUE);
+       }
+
+       private static ResultPartition createResultPartition(
+                       ResultPartitionType partitionType,
+                       int sortShuffleMinParallelism) {
                final ResultPartitionManager manager = new 
ResultPartitionManager();
 
                final ResultPartitionFactory factory = new 
ResultPartitionFactory(
@@ -102,7 +123,9 @@ public class ResultPartitionFactoryTest extends TestLogger {
                        SEGMENT_SIZE,
                        false,
                        "LZ4",
-                       Integer.MAX_VALUE);
+                       Integer.MAX_VALUE,
+                       10,
+                       sortShuffleMinParallelism);
 
                final ResultPartitionDeploymentDescriptor descriptor = new 
ResultPartitionDeploymentDescriptor(
                        PartitionDescriptorBuilder
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
new file mode 100644
index 0000000..74c3aef
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for blocking shuffle.
+ */
+public class BlockingShuffleITCase {
+
+       private static final String RECORD = "hello, world!";
+
+       private final int numTaskManagers = 2;
+
+       private final int numSlotsPerTaskManager = 4;
+
+       @Test
+       public void testBoundedBlockingShuffle() throws Exception {
+               JobGraph jobGraph = createJobGraph(1000000);
+               Configuration configuration = new Configuration();
+               JobGraphRunningUtil.execute(jobGraph, configuration, 
numTaskManagers, numSlotsPerTaskManager);
+       }
+
+       @Test
+       public void testBoundedBlockingShuffleWithoutData() throws Exception {
+               JobGraph jobGraph = createJobGraph(0);
+               Configuration configuration = new Configuration();
+               JobGraphRunningUtil.execute(jobGraph, configuration, 
numTaskManagers, numSlotsPerTaskManager);
+       }
+
+       @Test
+       public void testSortMergeBlockingShuffle() throws Exception {
+               Configuration configuration = new Configuration();
+               
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
 1);
+
+               JobGraph jobGraph = createJobGraph(1000000);
+               JobGraphRunningUtil.execute(jobGraph, configuration, 
numTaskManagers, numSlotsPerTaskManager);
+       }
+
+       @Test
+       public void testSortMergeBlockingShuffleWithoutData() throws Exception {
+               Configuration configuration = new Configuration();
+               
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
 1);
+
+               JobGraph jobGraph = createJobGraph(0);
+               JobGraphRunningUtil.execute(jobGraph, configuration, 
numTaskManagers, numSlotsPerTaskManager);
+       }
+
+       private JobGraph createJobGraph(int numRecordsToSend) {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
+               DataStream<String> source = env.addSource(new 
StringSource(numRecordsToSend));
+               source
+                       .rebalance().map((MapFunction<String, String>) value -> 
value)
+                       .broadcast().addSink(new VerifySink());
+
+               StreamGraph streamGraph = env.getStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+               streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+               return StreamingJobGraphGenerator.createJobGraph(streamGraph);
+       }
+
+       private static class StringSource implements 
ParallelSourceFunction<String> {
+               private volatile boolean isRunning = true;
+               private int numRecordsToSend;
+
+               StringSource(int numRecordsToSend) {
+                       this.numRecordsToSend = numRecordsToSend;
+               }
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       while (isRunning && numRecordsToSend-- > 0) {
+                               ctx.collect(RECORD);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+       }
+
+       private static class VerifySink implements SinkFunction<String> {
+
+               @Override
+               public void invoke(String value) throws Exception {
+                       assertEquals(RECORD, value);
+               }
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/JobGraphRunningUtil.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/JobGraphRunningUtil.java
new file mode 100644
index 0000000..f2c93a3
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/JobGraphRunningUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+
+/**
+ * Utils to run {@link JobGraph} on {@link MiniCluster}.
+ */
+public class JobGraphRunningUtil {
+
+       public static void execute(
+                       JobGraph jobGraph,
+                       Configuration configuration,
+                       int numTaskManagers,
+                       int numSlotsPerTaskManager) throws Exception {
+               configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, 
MemorySize.parse("1g"));
+
+               final MiniClusterConfiguration miniClusterConfiguration = new 
MiniClusterConfiguration.Builder()
+                       .setConfiguration(configuration)
+                       .setNumTaskManagers(numTaskManagers)
+                       .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
+                       .build();
+
+               try (MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration)) {
+                       miniCluster.start();
+
+                       MiniClusterClient miniClusterClient = new 
MiniClusterClient(configuration, miniCluster);
+                       // wait for the submission to succeed
+                       JobID jobID = 
miniClusterClient.submitJob(jobGraph).get();
+
+                       JobResult jobResult = 
miniClusterClient.requestJobResult(jobID).get();
+                       if (jobResult.getSerializedThrowable().isPresent()) {
+                               throw new 
AssertionError(jobResult.getSerializedThrowable().get());
+                       }
+               }
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index 3e118a2..ee1db78 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -20,12 +20,8 @@ package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -38,9 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.types.LongValue;
@@ -86,32 +79,12 @@ public class ShuffleCompressionITCase {
 
        @Test
        public void testDataCompressionForBlockingShuffle() throws Exception {
-               executeTest(createJobGraph(ScheduleMode.LAZY_FROM_SOURCES, 
ResultPartitionType.BLOCKING, ExecutionMode.BATCH));
-       }
-
-       private void executeTest(JobGraph jobGraph) throws Exception {
                Configuration configuration = new Configuration();
-               configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, 
MemorySize.parse("1g"));
                
configuration.setBoolean(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED,
 true);
 
-               final MiniClusterConfiguration miniClusterConfiguration = new 
MiniClusterConfiguration.Builder()
-                       .setConfiguration(configuration)
-                       .setNumTaskManagers(NUM_TASKMANAGERS)
-                       .setNumSlotsPerTaskManager(NUM_SLOTS)
-                       .build();
-
-               try (MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration)) {
-                       miniCluster.start();
-
-                       MiniClusterClient miniClusterClient = new 
MiniClusterClient(configuration, miniCluster);
-                       // wait for the submission to succeed
-                       JobID jobID = 
miniClusterClient.submitJob(jobGraph).get();
-
-                       JobResult jobResult = 
miniClusterClient.requestJobResult(jobID).get();
-                       if (jobResult.getSerializedThrowable().isPresent()) {
-                               throw new 
AssertionError(jobResult.getSerializedThrowable().get());
-                       }
-               }
+               JobGraph jobGraph = createJobGraph(
+                       ScheduleMode.LAZY_FROM_SOURCES, 
ResultPartitionType.BLOCKING, ExecutionMode.BATCH);
+               JobGraphRunningUtil.execute(jobGraph, configuration, 
NUM_TASKMANAGERS, NUM_SLOTS);
        }
 
        private static JobGraph createJobGraph(

Reply via email to