Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r152985049 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - if (gate.getConsumedPartitionType().isCreditBased()) { - // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly - bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - } else { - int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - gate.getNumberOfInputChannels() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), - maxNumberOfMemorySegments); - } + // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly + bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); + gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- What about the non-bounded partition type that we use for batch processing? Shouldn't we use an unbounded number of floating buffers there, as previously?
---