zhijiangW commented on a change in pull request #11687: URL: https://github.com/apache/flink/pull/11687#discussion_r419520547
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -105,12 +98,15 @@ public RemoteInputChannel( int maxBackoff, InputChannelMetrics metrics) { - super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, - metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter()); + super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics); this.connectionId = checkNotNull(connectionId); this.connectionManager = checkNotNull(connectionManager); - this.bufferManager = new BufferManager(this, 0); + // In theory it should get the total number of states to indicate the numRequiredBuffers. + // Since we can not get this information in advance, and considering only one input channel + // will read state at the same time by design, then we give a maximum value here to reduce + // unnecessary interactions with buffer pool during recovery. + this.bufferManager = new BufferManager(this, Integer.MAX_VALUE); Review comment: As I explained for `LocalInputChannel` case, this `numRequiredBuffers` setting is only for a bit optimization, actually we can unify them as 0 and adjust it while really requesting floating buffers in process. ATM we only have one input channel under unspill, so it makes sense to grab all the available floating buffers for this channel now. After this channel finishes unspilling, then it would release all the floating buffers back to `LocalBufferPool` to be reused by other unspill channel. There was a bit tricky to design the factor of `numRequiredBuffers` before. If one exclusive buffer is recycled or a floating buffer is recycled to notify available for the listener, it would double check whether the current listener still needs more floating buffers ATM based on `numRequiredBuffers`. If not needed, then the floating buffer would be return back to local pool to assign other listeners. For input channel unspill case, we can assume that the current channel is always needing more floating buffers until finish, to avoid the floating buffer back to local pool and request from pool again when need it next time. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org