wsry commented on a change in pull request #15199:
URL: https://github.com/apache/flink/pull/15199#discussion_r598080749



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
##########
@@ -119,13 +121,34 @@ static NettyShuffleEnvironment 
createNettyShuffleEnvironment(
                         config.networkBufferSize(),
                         config.getRequestSegmentsTimeout());
 
+        // we create a separated buffer pool here for batch shuffle instead of 
reusing the network
+        // buffer pool directly to avoid potential side effects of memory 
contention, for example,
+        // dead lock or "insufficient network buffer" error
+        BatchShuffleReadBufferPool batchShuffleReadBufferPool =
+                new BatchShuffleReadBufferPool(
+                        config.batchShuffleReadMemoryBytes(), 
config.networkBufferSize());
+
+        // we create a separated IO executor pool here for batch shuffle 
instead of reusing the
+        // TaskManager IO executor pool directly to avoid the potential side 
effects of execution
+        // contention, for example, too long IO or waiting time leading to 
starvation or timeout
+        int numThreads =
+                batchShuffleReadBufferPool.getNumTotalBuffers()
+                        / Math.max(1, 
batchShuffleReadBufferPool.getNumBuffersPerRequest());

Review comment:
       numTotalBuffers can be 0 if the total bytes is less than the buffer size




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to