TanYuxin-tyx commented on code in PR #23851: URL: https://github.com/apache/flink/pull/23851#discussion_r1479461473
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java: ########## @@ -224,6 +246,16 @@ private int tryRequestBuffers() { return numRequestedBuffers; } + /** + * The {@link LocalRecoveredInputChannel} also needs buffers to store the state, however, the + * expected size of local buffer pool is calculated with the number of remote input channels. So + * we request exclusive buffers for the {@link LocalRecoveredInputChannel} from the global + * buffer pool and these buffers are released once the recovery is finish. + */ + private boolean requestExclusiveBufferFromGlobal() { Review Comment: The method name is a little strange here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java: ########## @@ -148,28 +161,37 @@ private void resizeBufferQueue() { } /** Requests exclusive buffers from the provider. */ - void requestExclusiveBuffers() { - synchronized (bufferQueue) { - checkState(numExclusiveBuffers >= 0, "Num exclusive buffers must be non-negative."); - resizeBufferQueue(); - if (numExclusiveBuffers == 0) { - return; - } + void requestExclusiveBuffers() throws IOException { + checkState(numExclusiveBuffers >= 0, "Num exclusive buffers must be non-negative."); Review Comment: Why do these lines remove the lock? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org