xintongsong commented on code in PR #23851: URL: https://github.com/apache/flink/pull/23851#discussion_r1501757320
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java: ########## @@ -600,73 +605,138 @@ private void redistributeBuffers() { } // All buffers, which are not among the required ones - final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; + int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; if (numAvailableMemorySegment == 0) { // in this case, we need to redistribute buffers so that every pool gets its minimum for (LocalBufferPool bufferPool : resizableBufferPools) { - bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments()); + bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments()); } return; } - /* - * With buffer pools being potentially limited, let's distribute the available memory - * segments based on the capacity of each buffer pool, i.e. the maximum number of segments - * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools - * it may be less. Based on this and the sum of all these values (totalCapacity), we build - * a ratio that we use to distribute the buffers. - */ - - long totalCapacity = 0; // long to avoid int overflow + // Calculates the number of buffers that can be redistributed and the total weight of buffer + // pools that are resizable + int totalWeight = 0; + int numBuffersToBeRedistributed = numAvailableMemorySegment; + for (LocalBufferPool bufferPool : allBufferPools) { + if (resizableBufferPools.contains(bufferPool)) { Review Comment: Why not just loop over `resizableBufferPools`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateSpecUtils.java: ########## @@ -112,21 +108,17 @@ private static int getExclusiveBuffersPerChannel( (requiredBuffersPerGate - 1) / numInputChannels); } - private static int getRequiredBuffersTargetPerGate( + private static int getExpectedBuffersTargetPerGate( int numInputChannels, int configuredNetworkBuffersPerChannel) { return numInputChannels * configuredNetworkBuffersPerChannel + 1; } - private static int getTotalBuffersTargetPerGate( + /** */ Review Comment: Should be removed. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java: ########## @@ -600,73 +605,138 @@ private void redistributeBuffers() { } // All buffers, which are not among the required ones - final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; + int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; if (numAvailableMemorySegment == 0) { // in this case, we need to redistribute buffers so that every pool gets its minimum for (LocalBufferPool bufferPool : resizableBufferPools) { - bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments()); + bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments()); } return; } - /* - * With buffer pools being potentially limited, let's distribute the available memory - * segments based on the capacity of each buffer pool, i.e. the maximum number of segments - * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools - * it may be less. Based on this and the sum of all these values (totalCapacity), we build - * a ratio that we use to distribute the buffers. - */ - - long totalCapacity = 0; // long to avoid int overflow + // Calculates the number of buffers that can be redistributed and the total weight of buffer + // pools that are resizable + int totalWeight = 0; + int numBuffersToBeRedistributed = numAvailableMemorySegment; + for (LocalBufferPool bufferPool : allBufferPools) { + if (resizableBufferPools.contains(bufferPool)) { + numBuffersToBeRedistributed += bufferPool.getMinNumberOfMemorySegments(); + totalWeight += bufferPool.getExpectedNumberOfMemorySegments(); + } + } + // First, all buffers are allocated proportionally according to the expected values of each + // pool as weights. However, due to the constraints of minimum and maximum values, the + // actual number of buffers distributed may be more or less than the total number of + // buffers. + int totalAllocated = 0; + Map<LocalBufferPool, Integer> cachedPoolSize = new HashMap<>(resizableBufferPools.size()); for (LocalBufferPool bufferPool : resizableBufferPools) { - int excessMax = - bufferPool.getMaxNumberOfMemorySegments() - - bufferPool.getNumberOfRequiredMemorySegments(); - totalCapacity += Math.min(numAvailableMemorySegment, excessMax); + int expectedNumBuffers = + bufferPool.getExpectedNumberOfMemorySegments() + * numBuffersToBeRedistributed + / totalWeight; + int actualAllocated = + Math.min( + bufferPool.getMaxNumberOfMemorySegments(), + Math.max( + bufferPool.getMinNumberOfMemorySegments(), expectedNumBuffers)); + cachedPoolSize.put(bufferPool, actualAllocated); + totalAllocated += actualAllocated; } - // no capacity to receive additional buffers? - if (totalCapacity == 0) { - return; // necessary to avoid div by zero when nothing to re-distribute + // Now we need to deal with this difference, which may be greater than zero or less than + // zero. + int delta = numBuffersToBeRedistributed - totalAllocated; + while (true) { + int remaining = redistributeBuffers(delta, cachedPoolSize); + + // Stop the loop iteration when there is no remaining segments to be redistributed or + // all local buffer pools have reached the max number. + if (remaining == 0 || remaining == delta) { + for (LocalBufferPool bufferPool : resizableBufferPools) { + bufferPool.setNumBuffers( + cachedPoolSize.getOrDefault( + bufferPool, bufferPool.getMinNumberOfMemorySegments())); + } + break; + } + delta = remaining; } + } - // since one of the arguments of 'min(a,b)' is a positive int, this is actually - // guaranteed to be within the 'int' domain - // (we use a checked downCast to handle possible bugs more gracefully). - final int memorySegmentsToDistribute = - MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, totalCapacity)); + /** + * @param delta the buffers to be redistributed. + * @param cachedPoolSize the map to cache the intermediate result. + * @return the remaining buffers that can continue to be redistributed. + */ + private int redistributeBuffers(int delta, Map<LocalBufferPool, Integer> cachedPoolSize) { + Set<LocalBufferPool> poolsToBeRedistributed = new HashSet<>(); - long totalPartsUsed = 0; // of totalCapacity - int numDistributedMemorySegment = 0; - for (LocalBufferPool bufferPool : resizableBufferPools) { - int excessMax = - bufferPool.getMaxNumberOfMemorySegments() - - bufferPool.getNumberOfRequiredMemorySegments(); + if (delta > 0) { + // In this case, we need to allocate the remaining buffers to pools that have + // not yet reached their maximum number. - // shortcut - if (excessMax == 0) { - continue; + int totalWeight = 0; + for (LocalBufferPool bufferPool : resizableBufferPools) { + if (cachedPoolSize.get(bufferPool) < bufferPool.getMaxNumberOfMemorySegments()) { + poolsToBeRedistributed.add(bufferPool); + totalWeight += bufferPool.getExpectedNumberOfMemorySegments(); + } } - totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax); + int totalAllocated = 0; + for (LocalBufferPool bufferPool : poolsToBeRedistributed) { + float fraction = + (float) bufferPool.getExpectedNumberOfMemorySegments() / totalWeight; Review Comment: Also, we may consider calculating the fraction between total-weight and total-buffers, rather than fraction between weight of each LBP and total-weight. The math here is that, the following formulas should be equivalent, while the second one should avoid division operations for each LBP. ``` LBP_A_extra_buffers = (LBP_A_weight / total_weight) * total_extra_buffers LBP_A_extra_buffers = (total_extra_buffers / total_weight) * LBP_A_weight ``` ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -38,6 +38,9 @@ public class NettyShuffleEnvironmentOptions { private static final String HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH_OPTION_NAME = "taskmanager.network.hybrid-shuffle.remote.path"; + private static final String HYBRID_SHUFFLE_MEMORY_DECOUPLING_OPTION_NAME = + "taskmanager.network.hybrid-shuffle.memory-decoupling.enabled"; Review Comment: Why do we need this string constant? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java: ########## @@ -54,8 +53,8 @@ public class BufferManager implements BufferListener, BufferRecycler { /** The available buffer queue wraps both exclusive and requested floating buffers. */ private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue(); - /** The buffer provider for requesting exclusive buffers. */ - private final MemorySegmentProvider globalPool; + /** The buffer provider for requesting exclusive buffers during recovery. */ + @Nullable private MemorySegmentProvider globalPool; Review Comment: 1. Why is this non-final? It's never assigned a new value after being constructed. 2. Why is this nullable? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java: ########## @@ -600,73 +605,138 @@ private void redistributeBuffers() { } // All buffers, which are not among the required ones - final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; + int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; if (numAvailableMemorySegment == 0) { // in this case, we need to redistribute buffers so that every pool gets its minimum for (LocalBufferPool bufferPool : resizableBufferPools) { - bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments()); + bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments()); } return; } - /* - * With buffer pools being potentially limited, let's distribute the available memory - * segments based on the capacity of each buffer pool, i.e. the maximum number of segments - * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools - * it may be less. Based on this and the sum of all these values (totalCapacity), we build - * a ratio that we use to distribute the buffers. - */ - - long totalCapacity = 0; // long to avoid int overflow + // Calculates the number of buffers that can be redistributed and the total weight of buffer + // pools that are resizable + int totalWeight = 0; + int numBuffersToBeRedistributed = numAvailableMemorySegment; + for (LocalBufferPool bufferPool : allBufferPools) { + if (resizableBufferPools.contains(bufferPool)) { + numBuffersToBeRedistributed += bufferPool.getMinNumberOfMemorySegments(); + totalWeight += bufferPool.getExpectedNumberOfMemorySegments(); + } + } + // First, all buffers are allocated proportionally according to the expected values of each + // pool as weights. However, due to the constraints of minimum and maximum values, the + // actual number of buffers distributed may be more or less than the total number of + // buffers. + int totalAllocated = 0; + Map<LocalBufferPool, Integer> cachedPoolSize = new HashMap<>(resizableBufferPools.size()); for (LocalBufferPool bufferPool : resizableBufferPools) { - int excessMax = - bufferPool.getMaxNumberOfMemorySegments() - - bufferPool.getNumberOfRequiredMemorySegments(); - totalCapacity += Math.min(numAvailableMemorySegment, excessMax); + int expectedNumBuffers = + bufferPool.getExpectedNumberOfMemorySegments() + * numBuffersToBeRedistributed + / totalWeight; + int actualAllocated = + Math.min( + bufferPool.getMaxNumberOfMemorySegments(), + Math.max( + bufferPool.getMinNumberOfMemorySegments(), expectedNumBuffers)); + cachedPoolSize.put(bufferPool, actualAllocated); + totalAllocated += actualAllocated; } - // no capacity to receive additional buffers? - if (totalCapacity == 0) { - return; // necessary to avoid div by zero when nothing to re-distribute + // Now we need to deal with this difference, which may be greater than zero or less than + // zero. + int delta = numBuffersToBeRedistributed - totalAllocated; + while (true) { Review Comment: `while (true)` can be risky. It would be a good practice to always add a breaking condition. In this case, we can move the if-condition here, and move the body of the if-block to after the while-loop. ########## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java: ########## @@ -420,6 +421,9 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration( pageSize, configuration.get( NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH)) + .setMemoryDecouplingEnabled( + configuration.getBoolean( Review Comment: `getBoolean` is deprecated. Use `get` instead. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java: ########## @@ -600,73 +605,138 @@ private void redistributeBuffers() { } // All buffers, which are not among the required ones - final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; + int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; if (numAvailableMemorySegment == 0) { // in this case, we need to redistribute buffers so that every pool gets its minimum for (LocalBufferPool bufferPool : resizableBufferPools) { - bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments()); + bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments()); } return; } - /* - * With buffer pools being potentially limited, let's distribute the available memory - * segments based on the capacity of each buffer pool, i.e. the maximum number of segments - * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools - * it may be less. Based on this and the sum of all these values (totalCapacity), we build - * a ratio that we use to distribute the buffers. - */ - - long totalCapacity = 0; // long to avoid int overflow + // Calculates the number of buffers that can be redistributed and the total weight of buffer + // pools that are resizable + int totalWeight = 0; + int numBuffersToBeRedistributed = numAvailableMemorySegment; + for (LocalBufferPool bufferPool : allBufferPools) { + if (resizableBufferPools.contains(bufferPool)) { + numBuffersToBeRedistributed += bufferPool.getMinNumberOfMemorySegments(); + totalWeight += bufferPool.getExpectedNumberOfMemorySegments(); + } + } + // First, all buffers are allocated proportionally according to the expected values of each + // pool as weights. However, due to the constraints of minimum and maximum values, the + // actual number of buffers distributed may be more or less than the total number of + // buffers. + int totalAllocated = 0; + Map<LocalBufferPool, Integer> cachedPoolSize = new HashMap<>(resizableBufferPools.size()); for (LocalBufferPool bufferPool : resizableBufferPools) { - int excessMax = - bufferPool.getMaxNumberOfMemorySegments() - - bufferPool.getNumberOfRequiredMemorySegments(); - totalCapacity += Math.min(numAvailableMemorySegment, excessMax); + int expectedNumBuffers = + bufferPool.getExpectedNumberOfMemorySegments() + * numBuffersToBeRedistributed + / totalWeight; + int actualAllocated = + Math.min( + bufferPool.getMaxNumberOfMemorySegments(), + Math.max( + bufferPool.getMinNumberOfMemorySegments(), expectedNumBuffers)); + cachedPoolSize.put(bufferPool, actualAllocated); + totalAllocated += actualAllocated; } - // no capacity to receive additional buffers? - if (totalCapacity == 0) { - return; // necessary to avoid div by zero when nothing to re-distribute + // Now we need to deal with this difference, which may be greater than zero or less than + // zero. + int delta = numBuffersToBeRedistributed - totalAllocated; + while (true) { + int remaining = redistributeBuffers(delta, cachedPoolSize); + + // Stop the loop iteration when there is no remaining segments to be redistributed or + // all local buffer pools have reached the max number. + if (remaining == 0 || remaining == delta) { + for (LocalBufferPool bufferPool : resizableBufferPools) { + bufferPool.setNumBuffers( + cachedPoolSize.getOrDefault( + bufferPool, bufferPool.getMinNumberOfMemorySegments())); + } + break; + } + delta = remaining; } + } - // since one of the arguments of 'min(a,b)' is a positive int, this is actually - // guaranteed to be within the 'int' domain - // (we use a checked downCast to handle possible bugs more gracefully). - final int memorySegmentsToDistribute = - MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, totalCapacity)); + /** + * @param delta the buffers to be redistributed. + * @param cachedPoolSize the map to cache the intermediate result. + * @return the remaining buffers that can continue to be redistributed. + */ + private int redistributeBuffers(int delta, Map<LocalBufferPool, Integer> cachedPoolSize) { + Set<LocalBufferPool> poolsToBeRedistributed = new HashSet<>(); - long totalPartsUsed = 0; // of totalCapacity - int numDistributedMemorySegment = 0; - for (LocalBufferPool bufferPool : resizableBufferPools) { - int excessMax = - bufferPool.getMaxNumberOfMemorySegments() - - bufferPool.getNumberOfRequiredMemorySegments(); + if (delta > 0) { + // In this case, we need to allocate the remaining buffers to pools that have + // not yet reached their maximum number. - // shortcut - if (excessMax == 0) { - continue; + int totalWeight = 0; + for (LocalBufferPool bufferPool : resizableBufferPools) { + if (cachedPoolSize.get(bufferPool) < bufferPool.getMaxNumberOfMemorySegments()) { + poolsToBeRedistributed.add(bufferPool); + totalWeight += bufferPool.getExpectedNumberOfMemorySegments(); + } } - totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax); + int totalAllocated = 0; + for (LocalBufferPool bufferPool : poolsToBeRedistributed) { + float fraction = + (float) bufferPool.getExpectedNumberOfMemorySegments() / totalWeight; Review Comment: While the current implementation is correct, I'd suggest to surround `(float) bufferPool.getExpectedNumberOfMemorySegments()` with extra brackets. This is simply a readability improvements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org