xintongsong commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1501757320


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java:
##########
@@ -600,73 +605,138 @@ private void redistributeBuffers() {
         }
 
         // All buffers, which are not among the required ones
-        final int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
+        int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
 
         if (numAvailableMemorySegment == 0) {
             // in this case, we need to redistribute buffers so that every 
pool gets its minimum
             for (LocalBufferPool bufferPool : resizableBufferPools) {
-                
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+                
bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments());
             }
             return;
         }
 
-        /*
-         * With buffer pools being potentially limited, let's distribute the 
available memory
-         * segments based on the capacity of each buffer pool, i.e. the 
maximum number of segments
-         * an unlimited buffer pool can take is numAvailableMemorySegment, for 
limited buffer pools
-         * it may be less. Based on this and the sum of all these values 
(totalCapacity), we build
-         * a ratio that we use to distribute the buffers.
-         */
-
-        long totalCapacity = 0; // long to avoid int overflow
+        // Calculates the number of buffers that can be redistributed and the 
total weight of buffer
+        // pools that are resizable
+        int totalWeight = 0;
+        int numBuffersToBeRedistributed = numAvailableMemorySegment;
+        for (LocalBufferPool bufferPool : allBufferPools) {
+            if (resizableBufferPools.contains(bufferPool)) {

Review Comment:
   Why not just loop over `resizableBufferPools`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateSpecUtils.java:
##########
@@ -112,21 +108,17 @@ private static int getExclusiveBuffersPerChannel(
                 (requiredBuffersPerGate - 1) / numInputChannels);
     }
 
-    private static int getRequiredBuffersTargetPerGate(
+    private static int getExpectedBuffersTargetPerGate(
             int numInputChannels, int configuredNetworkBuffersPerChannel) {
         return numInputChannels * configuredNetworkBuffersPerChannel + 1;
     }
 
-    private static int getTotalBuffersTargetPerGate(
+    /** */

Review Comment:
   Should be removed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java:
##########
@@ -600,73 +605,138 @@ private void redistributeBuffers() {
         }
 
         // All buffers, which are not among the required ones
-        final int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
+        int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
 
         if (numAvailableMemorySegment == 0) {
             // in this case, we need to redistribute buffers so that every 
pool gets its minimum
             for (LocalBufferPool bufferPool : resizableBufferPools) {
-                
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+                
bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments());
             }
             return;
         }
 
-        /*
-         * With buffer pools being potentially limited, let's distribute the 
available memory
-         * segments based on the capacity of each buffer pool, i.e. the 
maximum number of segments
-         * an unlimited buffer pool can take is numAvailableMemorySegment, for 
limited buffer pools
-         * it may be less. Based on this and the sum of all these values 
(totalCapacity), we build
-         * a ratio that we use to distribute the buffers.
-         */
-
-        long totalCapacity = 0; // long to avoid int overflow
+        // Calculates the number of buffers that can be redistributed and the 
total weight of buffer
+        // pools that are resizable
+        int totalWeight = 0;
+        int numBuffersToBeRedistributed = numAvailableMemorySegment;
+        for (LocalBufferPool bufferPool : allBufferPools) {
+            if (resizableBufferPools.contains(bufferPool)) {
+                numBuffersToBeRedistributed += 
bufferPool.getMinNumberOfMemorySegments();
+                totalWeight += bufferPool.getExpectedNumberOfMemorySegments();
+            }
+        }
 
+        // First, all buffers are allocated proportionally according to the 
expected values of each
+        // pool as weights. However, due to the constraints of minimum and 
maximum values, the
+        // actual number of buffers distributed may be more or less than the 
total number of
+        // buffers.
+        int totalAllocated = 0;
+        Map<LocalBufferPool, Integer> cachedPoolSize = new 
HashMap<>(resizableBufferPools.size());
         for (LocalBufferPool bufferPool : resizableBufferPools) {
-            int excessMax =
-                    bufferPool.getMaxNumberOfMemorySegments()
-                            - bufferPool.getNumberOfRequiredMemorySegments();
-            totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
+            int expectedNumBuffers =
+                    bufferPool.getExpectedNumberOfMemorySegments()
+                            * numBuffersToBeRedistributed
+                            / totalWeight;
+            int actualAllocated =
+                    Math.min(
+                            bufferPool.getMaxNumberOfMemorySegments(),
+                            Math.max(
+                                    bufferPool.getMinNumberOfMemorySegments(), 
expectedNumBuffers));
+            cachedPoolSize.put(bufferPool, actualAllocated);
+            totalAllocated += actualAllocated;
         }
 
-        // no capacity to receive additional buffers?
-        if (totalCapacity == 0) {
-            return; // necessary to avoid div by zero when nothing to 
re-distribute
+        // Now we need to deal with this difference, which may be greater than 
zero or less than
+        // zero.
+        int delta = numBuffersToBeRedistributed - totalAllocated;
+        while (true) {
+            int remaining = redistributeBuffers(delta, cachedPoolSize);
+
+            // Stop the loop iteration when there is no remaining segments to 
be redistributed or
+            // all local buffer pools have reached the max number.
+            if (remaining == 0 || remaining == delta) {
+                for (LocalBufferPool bufferPool : resizableBufferPools) {
+                    bufferPool.setNumBuffers(
+                            cachedPoolSize.getOrDefault(
+                                    bufferPool, 
bufferPool.getMinNumberOfMemorySegments()));
+                }
+                break;
+            }
+            delta = remaining;
         }
+    }
 
-        // since one of the arguments of 'min(a,b)' is a positive int, this is 
actually
-        // guaranteed to be within the 'int' domain
-        // (we use a checked downCast to handle possible bugs more gracefully).
-        final int memorySegmentsToDistribute =
-                MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, 
totalCapacity));
+    /**
+     * @param delta the buffers to be redistributed.
+     * @param cachedPoolSize the map to cache the intermediate result.
+     * @return the remaining buffers that can continue to be redistributed.
+     */
+    private int redistributeBuffers(int delta, Map<LocalBufferPool, Integer> 
cachedPoolSize) {
+        Set<LocalBufferPool> poolsToBeRedistributed = new HashSet<>();
 
-        long totalPartsUsed = 0; // of totalCapacity
-        int numDistributedMemorySegment = 0;
-        for (LocalBufferPool bufferPool : resizableBufferPools) {
-            int excessMax =
-                    bufferPool.getMaxNumberOfMemorySegments()
-                            - bufferPool.getNumberOfRequiredMemorySegments();
+        if (delta > 0) {
+            // In this case, we need to allocate the remaining buffers to 
pools that have
+            // not yet reached their maximum number.
 
-            // shortcut
-            if (excessMax == 0) {
-                continue;
+            int totalWeight = 0;
+            for (LocalBufferPool bufferPool : resizableBufferPools) {
+                if (cachedPoolSize.get(bufferPool) < 
bufferPool.getMaxNumberOfMemorySegments()) {
+                    poolsToBeRedistributed.add(bufferPool);
+                    totalWeight += 
bufferPool.getExpectedNumberOfMemorySegments();
+                }
             }
 
-            totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
+            int totalAllocated = 0;
+            for (LocalBufferPool bufferPool : poolsToBeRedistributed) {
+                float fraction =
+                        (float) bufferPool.getExpectedNumberOfMemorySegments() 
/ totalWeight;

Review Comment:
   Also, we may consider calculating the fraction between total-weight and 
total-buffers, rather than fraction between weight of each LBP and total-weight.
   
   The math here is that, the following formulas should be equivalent, while 
the  second one should avoid division operations for each LBP.
   
   ```
   LBP_A_extra_buffers = (LBP_A_weight / total_weight) * total_extra_buffers
   LBP_A_extra_buffers = (total_extra_buffers / total_weight) * LBP_A_weight
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -38,6 +38,9 @@ public class NettyShuffleEnvironmentOptions {
     private static final String 
HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH_OPTION_NAME =
             "taskmanager.network.hybrid-shuffle.remote.path";
 
+    private static final String HYBRID_SHUFFLE_MEMORY_DECOUPLING_OPTION_NAME =
+            "taskmanager.network.hybrid-shuffle.memory-decoupling.enabled";

Review Comment:
   Why do we need this string constant?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java:
##########
@@ -54,8 +53,8 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
     /** The available buffer queue wraps both exclusive and requested floating 
buffers. */
     private final AvailableBufferQueue bufferQueue = new 
AvailableBufferQueue();
 
-    /** The buffer provider for requesting exclusive buffers. */
-    private final MemorySegmentProvider globalPool;
+    /** The buffer provider for requesting exclusive buffers during recovery. 
*/
+    @Nullable private MemorySegmentProvider globalPool;

Review Comment:
   1. Why is this non-final? It's never assigned a new value after being 
constructed.
   2. Why is this nullable?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java:
##########
@@ -600,73 +605,138 @@ private void redistributeBuffers() {
         }
 
         // All buffers, which are not among the required ones
-        final int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
+        int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
 
         if (numAvailableMemorySegment == 0) {
             // in this case, we need to redistribute buffers so that every 
pool gets its minimum
             for (LocalBufferPool bufferPool : resizableBufferPools) {
-                
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+                
bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments());
             }
             return;
         }
 
-        /*
-         * With buffer pools being potentially limited, let's distribute the 
available memory
-         * segments based on the capacity of each buffer pool, i.e. the 
maximum number of segments
-         * an unlimited buffer pool can take is numAvailableMemorySegment, for 
limited buffer pools
-         * it may be less. Based on this and the sum of all these values 
(totalCapacity), we build
-         * a ratio that we use to distribute the buffers.
-         */
-
-        long totalCapacity = 0; // long to avoid int overflow
+        // Calculates the number of buffers that can be redistributed and the 
total weight of buffer
+        // pools that are resizable
+        int totalWeight = 0;
+        int numBuffersToBeRedistributed = numAvailableMemorySegment;
+        for (LocalBufferPool bufferPool : allBufferPools) {
+            if (resizableBufferPools.contains(bufferPool)) {
+                numBuffersToBeRedistributed += 
bufferPool.getMinNumberOfMemorySegments();
+                totalWeight += bufferPool.getExpectedNumberOfMemorySegments();
+            }
+        }
 
+        // First, all buffers are allocated proportionally according to the 
expected values of each
+        // pool as weights. However, due to the constraints of minimum and 
maximum values, the
+        // actual number of buffers distributed may be more or less than the 
total number of
+        // buffers.
+        int totalAllocated = 0;
+        Map<LocalBufferPool, Integer> cachedPoolSize = new 
HashMap<>(resizableBufferPools.size());
         for (LocalBufferPool bufferPool : resizableBufferPools) {
-            int excessMax =
-                    bufferPool.getMaxNumberOfMemorySegments()
-                            - bufferPool.getNumberOfRequiredMemorySegments();
-            totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
+            int expectedNumBuffers =
+                    bufferPool.getExpectedNumberOfMemorySegments()
+                            * numBuffersToBeRedistributed
+                            / totalWeight;
+            int actualAllocated =
+                    Math.min(
+                            bufferPool.getMaxNumberOfMemorySegments(),
+                            Math.max(
+                                    bufferPool.getMinNumberOfMemorySegments(), 
expectedNumBuffers));
+            cachedPoolSize.put(bufferPool, actualAllocated);
+            totalAllocated += actualAllocated;
         }
 
-        // no capacity to receive additional buffers?
-        if (totalCapacity == 0) {
-            return; // necessary to avoid div by zero when nothing to 
re-distribute
+        // Now we need to deal with this difference, which may be greater than 
zero or less than
+        // zero.
+        int delta = numBuffersToBeRedistributed - totalAllocated;
+        while (true) {

Review Comment:
   `while (true)` can be risky. It would be a good practice to always add a 
breaking condition. In this case, we can move the if-condition here, and move 
the body of the if-block to after the while-loop.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java:
##########
@@ -420,6 +421,9 @@ public static NettyShuffleEnvironmentConfiguration 
fromConfiguration(
                                     pageSize,
                                     configuration.get(
                                             
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH))
+                            .setMemoryDecouplingEnabled(
+                                    configuration.getBoolean(

Review Comment:
   `getBoolean` is deprecated. Use `get` instead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java:
##########
@@ -600,73 +605,138 @@ private void redistributeBuffers() {
         }
 
         // All buffers, which are not among the required ones
-        final int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
+        int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
 
         if (numAvailableMemorySegment == 0) {
             // in this case, we need to redistribute buffers so that every 
pool gets its minimum
             for (LocalBufferPool bufferPool : resizableBufferPools) {
-                
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+                
bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments());
             }
             return;
         }
 
-        /*
-         * With buffer pools being potentially limited, let's distribute the 
available memory
-         * segments based on the capacity of each buffer pool, i.e. the 
maximum number of segments
-         * an unlimited buffer pool can take is numAvailableMemorySegment, for 
limited buffer pools
-         * it may be less. Based on this and the sum of all these values 
(totalCapacity), we build
-         * a ratio that we use to distribute the buffers.
-         */
-
-        long totalCapacity = 0; // long to avoid int overflow
+        // Calculates the number of buffers that can be redistributed and the 
total weight of buffer
+        // pools that are resizable
+        int totalWeight = 0;
+        int numBuffersToBeRedistributed = numAvailableMemorySegment;
+        for (LocalBufferPool bufferPool : allBufferPools) {
+            if (resizableBufferPools.contains(bufferPool)) {
+                numBuffersToBeRedistributed += 
bufferPool.getMinNumberOfMemorySegments();
+                totalWeight += bufferPool.getExpectedNumberOfMemorySegments();
+            }
+        }
 
+        // First, all buffers are allocated proportionally according to the 
expected values of each
+        // pool as weights. However, due to the constraints of minimum and 
maximum values, the
+        // actual number of buffers distributed may be more or less than the 
total number of
+        // buffers.
+        int totalAllocated = 0;
+        Map<LocalBufferPool, Integer> cachedPoolSize = new 
HashMap<>(resizableBufferPools.size());
         for (LocalBufferPool bufferPool : resizableBufferPools) {
-            int excessMax =
-                    bufferPool.getMaxNumberOfMemorySegments()
-                            - bufferPool.getNumberOfRequiredMemorySegments();
-            totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
+            int expectedNumBuffers =
+                    bufferPool.getExpectedNumberOfMemorySegments()
+                            * numBuffersToBeRedistributed
+                            / totalWeight;
+            int actualAllocated =
+                    Math.min(
+                            bufferPool.getMaxNumberOfMemorySegments(),
+                            Math.max(
+                                    bufferPool.getMinNumberOfMemorySegments(), 
expectedNumBuffers));
+            cachedPoolSize.put(bufferPool, actualAllocated);
+            totalAllocated += actualAllocated;
         }
 
-        // no capacity to receive additional buffers?
-        if (totalCapacity == 0) {
-            return; // necessary to avoid div by zero when nothing to 
re-distribute
+        // Now we need to deal with this difference, which may be greater than 
zero or less than
+        // zero.
+        int delta = numBuffersToBeRedistributed - totalAllocated;
+        while (true) {
+            int remaining = redistributeBuffers(delta, cachedPoolSize);
+
+            // Stop the loop iteration when there is no remaining segments to 
be redistributed or
+            // all local buffer pools have reached the max number.
+            if (remaining == 0 || remaining == delta) {
+                for (LocalBufferPool bufferPool : resizableBufferPools) {
+                    bufferPool.setNumBuffers(
+                            cachedPoolSize.getOrDefault(
+                                    bufferPool, 
bufferPool.getMinNumberOfMemorySegments()));
+                }
+                break;
+            }
+            delta = remaining;
         }
+    }
 
-        // since one of the arguments of 'min(a,b)' is a positive int, this is 
actually
-        // guaranteed to be within the 'int' domain
-        // (we use a checked downCast to handle possible bugs more gracefully).
-        final int memorySegmentsToDistribute =
-                MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, 
totalCapacity));
+    /**
+     * @param delta the buffers to be redistributed.
+     * @param cachedPoolSize the map to cache the intermediate result.
+     * @return the remaining buffers that can continue to be redistributed.
+     */
+    private int redistributeBuffers(int delta, Map<LocalBufferPool, Integer> 
cachedPoolSize) {
+        Set<LocalBufferPool> poolsToBeRedistributed = new HashSet<>();
 
-        long totalPartsUsed = 0; // of totalCapacity
-        int numDistributedMemorySegment = 0;
-        for (LocalBufferPool bufferPool : resizableBufferPools) {
-            int excessMax =
-                    bufferPool.getMaxNumberOfMemorySegments()
-                            - bufferPool.getNumberOfRequiredMemorySegments();
+        if (delta > 0) {
+            // In this case, we need to allocate the remaining buffers to 
pools that have
+            // not yet reached their maximum number.
 
-            // shortcut
-            if (excessMax == 0) {
-                continue;
+            int totalWeight = 0;
+            for (LocalBufferPool bufferPool : resizableBufferPools) {
+                if (cachedPoolSize.get(bufferPool) < 
bufferPool.getMaxNumberOfMemorySegments()) {
+                    poolsToBeRedistributed.add(bufferPool);
+                    totalWeight += 
bufferPool.getExpectedNumberOfMemorySegments();
+                }
             }
 
-            totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
+            int totalAllocated = 0;
+            for (LocalBufferPool bufferPool : poolsToBeRedistributed) {
+                float fraction =
+                        (float) bufferPool.getExpectedNumberOfMemorySegments() 
/ totalWeight;

Review Comment:
   While the current implementation is correct, I'd suggest to surround 
`(float) bufferPool.getExpectedNumberOfMemorySegments()` with extra brackets. 
This is simply a readability improvements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to