wsry commented on a change in pull request #15199: URL: https://github.com/apache/flink/pull/15199#discussion_r598080749
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java ########## @@ -119,13 +121,34 @@ static NettyShuffleEnvironment createNettyShuffleEnvironment( config.networkBufferSize(), config.getRequestSegmentsTimeout()); + // we create a separated buffer pool here for batch shuffle instead of reusing the network + // buffer pool directly to avoid potential side effects of memory contention, for example, + // dead lock or "insufficient network buffer" error + BatchShuffleReadBufferPool batchShuffleReadBufferPool = + new BatchShuffleReadBufferPool( + config.batchShuffleReadMemoryBytes(), config.networkBufferSize()); + + // we create a separated IO executor pool here for batch shuffle instead of reusing the + // TaskManager IO executor pool directly to avoid the potential side effects of execution + // contention, for example, too long IO or waiting time leading to starvation or timeout + int numThreads = + batchShuffleReadBufferPool.getNumTotalBuffers() + / Math.max(1, batchShuffleReadBufferPool.getNumBuffersPerRequest()); Review comment: numTotalBuffers can be 0 if the total bytes is less than the buffer size -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org