reswqa commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1503610332


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/LegacyTableSinkITCase.scala:
##########
@@ -277,6 +277,7 @@ class LegacyTableSinkITCase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+    env.setParallelism(4)

Review Comment:
   What happens if we don't set parallelism here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -73,32 +69,32 @@ public static Tuple3<Integer, Integer, Integer> 
getMinMaxNetworkBuffersPerResult
             final int sortShuffleMinBuffers,
             final int numSubpartitions,
             final boolean enableTieredStorage,
+            final boolean enableMemoryDecoupling,
             final int tieredStoreExclusiveBuffers,
+            final int tieredStorageMinBuffersPerResultPartition,
             final ResultPartitionType type) {
         boolean isSortShuffle =
                 type.isBlockingOrBlockingPersistentResultPartition()
                         && numSubpartitions >= sortShuffleMinParallelism;
-        int expected;
-        if (isSortShuffle) {
-            expected = sortShuffleMinBuffers;
-        } else {
-            expected =
-                    enableTieredStorage
-                            ? Math.min(tieredStoreExclusiveBuffers, 
numSubpartitions + 1)
-                            : (numSubpartitions + 1);
-        }
+        boolean isMemoryDecouplingEnabled =
+                type.isHybridResultPartition() && enableTieredStorage && 
enableMemoryDecoupling;
 
-        int min = expected;
-        if (type.isHybridResultPartition()) {
-            min = Math.min(DEFAULT_HYBRID_SHUFFLE_MIN_BUFFERS, expected);
-        }
+        int expected =
+                isSortShuffle
+                        ? sortShuffleMinBuffers
+                        : (enableTieredStorage
+                                ? Math.min(tieredStoreExclusiveBuffers, 
numSubpartitions + 1)
+                                : (numSubpartitions + 1));
+
+        int min = isMemoryDecouplingEnabled ? 
tieredStorageMinBuffersPerResultPartition : expected;
+        expected = Math.max(min, expected);
 
         int max =
                 type.isBounded()
                         ? numSubpartitions * configuredNetworkBuffersPerChannel
                                 + numFloatingBuffersPerGate
                         : (isSortShuffle
-                                ? 4 * numSubpartitions
+                                ? Math.max(min, 4 * numSubpartitions)

Review Comment:
   Not sure, but why we change the max buffer of sort shuffle. IIUC, we only 
focus on hybrid shuffle.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java:
##########
@@ -40,8 +40,11 @@ public interface BufferPool extends BufferProvider, 
BufferRecycler {
     @Override
     boolean isDestroyed();
 
+    /** Returns the number of expected memory segments of this buffer pool. */
+    int getExpectedNumberOfMemorySegments();

Review Comment:
   We should explain what is the meaning of expected memory segment in doc. And 
It's better to find a suitable place to explain the new buffer allocation 
algorithm.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -272,16 +273,22 @@ public void reserveSegments(int 
numberOfSegmentsToReserve) throws IOException {
                 "Can not reserve more segments than number of minimum 
segments.");
 
         CompletableFuture<?> toNotify = null;
+
+        int numSegmentsNeeded;
         synchronized (availableMemorySegments) {
             checkDestroyed();
+            numSegmentsNeeded = numberOfSegmentsToReserve - 
numberOfRequestedMemorySegments;
+        }
 
-            if (numberOfRequestedMemorySegments < numberOfSegmentsToReserve) {
-                availableMemorySegments.addAll(
-                        networkBufferPool.requestPooledMemorySegmentsBlocking(
-                                numberOfSegmentsToReserve - 
numberOfRequestedMemorySegments));
+        if (numSegmentsNeeded > 0) {

Review Comment:
   Can you explain why the deadlock is triggered? I'm missing some context.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##########
@@ -149,6 +149,9 @@ public class SingleInputGate extends IndexedInputGate {
     /** The number of input channels (equivalent to the number of consumed 
partitions). */
     private final int numberOfInputChannels;
 
+    /** The number of local input channels. */
+    private int numberOfLocalInputChannels;

Review Comment:
   @GuardedBy("requestLock").



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java:
##########
@@ -276,10 +274,13 @@ protected int getNumberOfQueuedBuffers() {
 
     public Buffer requestBufferBlocking() throws InterruptedException, 
IOException {
         // not in setup to avoid assigning buffers unnecessarily if there is 
no state
-        if (!exclusiveBuffersAssigned) {
-            bufferManager.requestExclusiveBuffers(networkBuffersPerChannel);
-            exclusiveBuffersAssigned = true;
+        if (this instanceof LocalRecoveredInputChannel) {

Review Comment:
   Could we override this method for LocalRecoveredInputChannel instead of 
doing the instanceof check?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java:
##########
@@ -132,15 +149,73 @@ private boolean shouldContinueRequest(BufferPool 
bufferPool) {
         }
     }
 
-    /** Requests exclusive buffers from the provider. */
-    void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
-        checkArgument(numExclusiveBuffers >= 0, "Num exclusive buffers must be 
non-negative.");
+    private void resizeBufferQueue() {
+        if (shouldRequestExclusiveBufferFromGlobal()) {
+            return;
+        }
+
+        SingleInputGate inputGate = inputChannel.inputGate;
+        int currentSize = inputGate.getBufferPool().getNumBuffers();
+        int numRemoteChannels =
+                inputGate.getNumberOfInputChannels() - 
inputGate.getNumberOfLocalInputChannels();

Review Comment:
   Is `inputGate.getNumberOfLocalInputChannels()` thread safe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to