pnowojski commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r400327461
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 ##########
 @@ -174,6 +174,19 @@
                                " help relieve back-pressure caused by 
unbalanced data distribution among the subpartitions. This value should be" +
                                " increased in case of higher round trip times 
between nodes and/or larger number of machines in the cluster.");
 
+       /**
+        * Number of max backlogs can be used for each output subparition.
+        */
+       @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+       public static final ConfigOption<Integer> 
NETWORK_MAX_BACKLOGS_PER_SUBPARTITION =
+                       key("taskmanager.network.max-backlogs-per-subpartition")
+                                       .defaultValue(Integer.MAX_VALUE)
+                                       .withDescription("Number of max 
backlogs can be used for each output subpartition." +
+                                                       " If a subpartition 
exceeds the number of max backlogs, it will make the ResultPartition 
unavailable and" +
+                                                       " block the processing. 
This benefits in reducing the in-flight data and speeding up the barrier 
alignment" +
+                                                       " when most of the 
buffers are going to one subpartition (data skew). This limitation is not 
strictly" +
+                                                       " guaranteed, which 
usually happens in one-to-many operators like flatmap.");
+
 
 Review comment:
   
   > Number of max buffers that can be used for each channel. If a channel 
exceeds the number of max buffers, it will make the become unavailable, cause 
the back pressure and block the data processing. This might speed up checkpoint 
alignment by preventing excessive growth of the buffered in-flight data in case 
of data skew and high number of configured floating buffers. This limit is not 
strictly guaranteed, and can be ignored by things like flatMap operators, 
records spanning multiple buffers or single timer producing large amount of 
data.
   
   ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to