TanYuxin-tyx commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1479461473


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java:
##########
@@ -224,6 +246,16 @@ private int tryRequestBuffers() {
         return numRequestedBuffers;
     }
 
+    /**
+     * The {@link LocalRecoveredInputChannel} also needs buffers to store the 
state, however, the
+     * expected size of local buffer pool is calculated with the number of 
remote input channels. So
+     * we request exclusive buffers for the {@link LocalRecoveredInputChannel} 
from the global
+     * buffer pool and these buffers are released once the recovery is finish.
+     */
+    private boolean requestExclusiveBufferFromGlobal() {

Review Comment:
   The method name is a little strange here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java:
##########
@@ -148,28 +161,37 @@ private void resizeBufferQueue() {
     }
 
     /** Requests exclusive buffers from the provider. */
-    void requestExclusiveBuffers() {
-        synchronized (bufferQueue) {
-            checkState(numExclusiveBuffers >= 0, "Num exclusive buffers must 
be non-negative.");
-            resizeBufferQueue();
-            if (numExclusiveBuffers == 0) {
-                return;
-            }
+    void requestExclusiveBuffers() throws IOException {
+        checkState(numExclusiveBuffers >= 0, "Num exclusive buffers must be 
non-negative.");

Review Comment:
   Why do these lines remove the lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to