zhuzhurk commented on code in PR #21861:
URL: https://github.com/apache/flink/pull/21861#discussion_r1101058296


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -75,54 +75,106 @@
      */
     private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768;
 
-    private final int maxParallelism;
-    private final int minParallelism;
+    private final int globalMaxParallelism;
+    private final int globalMinParallelism;
     private final long dataVolumePerTask;
-    private final int defaultSourceParallelism;
+    private final int globalDefaultSourceParallelism;
 
     private DefaultVertexParallelismAndInputInfosDecider(
-            int maxParallelism,
-            int minParallelism,
+            int globalMaxParallelism,
+            int globalMinParallelism,
             MemorySize dataVolumePerTask,
-            int defaultSourceParallelism) {
+            int globalDefaultSourceParallelism) {
 
-        checkArgument(minParallelism > 0, "The minimum parallelism must be 
larger than 0.");
+        checkArgument(globalMinParallelism > 0, "The minimum parallelism must 
be larger than 0.");
         checkArgument(
-                maxParallelism >= minParallelism,
+                globalMaxParallelism >= globalMinParallelism,
                 "Maximum parallelism should be greater than or equal to the 
minimum parallelism.");
         checkArgument(
-                defaultSourceParallelism > 0,
+                globalDefaultSourceParallelism > 0,
                 "The default source parallelism must be larger than 0.");
         checkNotNull(dataVolumePerTask);
 
-        this.maxParallelism = maxParallelism;
-        this.minParallelism = minParallelism;
+        this.globalMaxParallelism = globalMaxParallelism;
+        this.globalMinParallelism = globalMinParallelism;
         this.dataVolumePerTask = dataVolumePerTask.getBytes();
-        this.defaultSourceParallelism = defaultSourceParallelism;
+        this.globalDefaultSourceParallelism = globalDefaultSourceParallelism;
     }
 
     @Override
     public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex(
             JobVertexID jobVertexId,
             List<BlockingResultInfo> consumedResults,
-            int initialParallelism) {
+            int initialParallelism,
+            int initialMaxParallelism) {
         checkArgument(
                 initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
                         || initialParallelism > 0);
+        checkArgument(initialMaxParallelism > 0 && initialMaxParallelism >= 
initialParallelism);
 
         if (consumedResults.isEmpty()) {
             // source job vertex
+            int defaultSourceParallelism = globalDefaultSourceParallelism;
+            if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                    && initialMaxParallelism < defaultSourceParallelism) {
+                LOG.info(
+                        "The initial maximum parallelism {} is smaller than 
the global default source parallelism {}. "
+                                + "Use {} as the final default parallelism of 
source job vertex {}.",
+                        initialMaxParallelism,
+                        defaultSourceParallelism,
+                        initialMaxParallelism,
+                        jobVertexId);
+                defaultSourceParallelism = initialMaxParallelism;
+            }
+
             int parallelism =
                     initialParallelism > 0 ? initialParallelism : 
defaultSourceParallelism;

Review Comment:
   It's better to skip computing the `defaultSourceParallelism` if 
`initialParallelism > 0`. 
   Maybe introduce a method "computeSourceParallelism(jobVertexId, 
maxParallelism)"?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismAndInputInfosDecider.java:
##########
@@ -41,10 +41,12 @@ public interface VertexParallelismAndInputInfosDecider {
      *     number, it will be respected. If it's not set(equals to {@link
      *     ExecutionConfig#PARALLELISM_DEFAULT}), a parallelism will be 
automatically decided for
      *     the vertex.
+     * @param initialMaxParallelism The initial max parallelism of the job 
vertex.
      * @return the parallelism and vertex input infos.
      */
     ParallelismAndInputInfos decideParallelismAndInputInfosForVertex(
             JobVertexID jobVertexId,
             List<BlockingResultInfo> consumedResults,
-            int initialParallelism);
+            int initialParallelism,
+            int initialMaxParallelism);

Review Comment:
   Maybe name them as `vertexInitialParallelism` and `vertexMaxParallelism`? I 
feel it can make the code easier to understand. Also, `initialMaxParallelism` 
indicates that the value is not finalized, which is not the truth.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -75,54 +75,106 @@
      */
     private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768;
 
-    private final int maxParallelism;
-    private final int minParallelism;
+    private final int globalMaxParallelism;
+    private final int globalMinParallelism;
     private final long dataVolumePerTask;
-    private final int defaultSourceParallelism;
+    private final int globalDefaultSourceParallelism;
 
     private DefaultVertexParallelismAndInputInfosDecider(
-            int maxParallelism,
-            int minParallelism,
+            int globalMaxParallelism,
+            int globalMinParallelism,
             MemorySize dataVolumePerTask,
-            int defaultSourceParallelism) {
+            int globalDefaultSourceParallelism) {
 
-        checkArgument(minParallelism > 0, "The minimum parallelism must be 
larger than 0.");
+        checkArgument(globalMinParallelism > 0, "The minimum parallelism must 
be larger than 0.");
         checkArgument(
-                maxParallelism >= minParallelism,
+                globalMaxParallelism >= globalMinParallelism,
                 "Maximum parallelism should be greater than or equal to the 
minimum parallelism.");
         checkArgument(
-                defaultSourceParallelism > 0,
+                globalDefaultSourceParallelism > 0,
                 "The default source parallelism must be larger than 0.");
         checkNotNull(dataVolumePerTask);
 
-        this.maxParallelism = maxParallelism;
-        this.minParallelism = minParallelism;
+        this.globalMaxParallelism = globalMaxParallelism;
+        this.globalMinParallelism = globalMinParallelism;
         this.dataVolumePerTask = dataVolumePerTask.getBytes();
-        this.defaultSourceParallelism = defaultSourceParallelism;
+        this.globalDefaultSourceParallelism = globalDefaultSourceParallelism;
     }
 
     @Override
     public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex(
             JobVertexID jobVertexId,
             List<BlockingResultInfo> consumedResults,
-            int initialParallelism) {
+            int initialParallelism,
+            int initialMaxParallelism) {
         checkArgument(
                 initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
                         || initialParallelism > 0);
+        checkArgument(initialMaxParallelism > 0 && initialMaxParallelism >= 
initialParallelism);
 
         if (consumedResults.isEmpty()) {
             // source job vertex
+            int defaultSourceParallelism = globalDefaultSourceParallelism;
+            if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                    && initialMaxParallelism < defaultSourceParallelism) {
+                LOG.info(
+                        "The initial maximum parallelism {} is smaller than 
the global default source parallelism {}. "
+                                + "Use {} as the final default parallelism of 
source job vertex {}.",

Review Comment:
   I think it's better to state it as "The global default source parallelism {} 
is larger than the maximum parallelism {}. 
   Similarly, I feel `defaultSourceParallelism > initialMaxParallelism`(or 
`sourceParallelism > maxParallelism`) would be better, though there is no 
strict requirement for this.
   
   And "Use {} as the parallelism of source job vertex {}." considering my 
another comment.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -75,54 +75,106 @@
      */
     private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768;
 
-    private final int maxParallelism;
-    private final int minParallelism;
+    private final int globalMaxParallelism;
+    private final int globalMinParallelism;
     private final long dataVolumePerTask;
-    private final int defaultSourceParallelism;
+    private final int globalDefaultSourceParallelism;
 
     private DefaultVertexParallelismAndInputInfosDecider(
-            int maxParallelism,
-            int minParallelism,
+            int globalMaxParallelism,
+            int globalMinParallelism,
             MemorySize dataVolumePerTask,
-            int defaultSourceParallelism) {
+            int globalDefaultSourceParallelism) {
 
-        checkArgument(minParallelism > 0, "The minimum parallelism must be 
larger than 0.");
+        checkArgument(globalMinParallelism > 0, "The minimum parallelism must 
be larger than 0.");
         checkArgument(
-                maxParallelism >= minParallelism,
+                globalMaxParallelism >= globalMinParallelism,
                 "Maximum parallelism should be greater than or equal to the 
minimum parallelism.");
         checkArgument(
-                defaultSourceParallelism > 0,
+                globalDefaultSourceParallelism > 0,
                 "The default source parallelism must be larger than 0.");
         checkNotNull(dataVolumePerTask);
 
-        this.maxParallelism = maxParallelism;
-        this.minParallelism = minParallelism;
+        this.globalMaxParallelism = globalMaxParallelism;
+        this.globalMinParallelism = globalMinParallelism;
         this.dataVolumePerTask = dataVolumePerTask.getBytes();
-        this.defaultSourceParallelism = defaultSourceParallelism;
+        this.globalDefaultSourceParallelism = globalDefaultSourceParallelism;
     }
 
     @Override
     public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex(
             JobVertexID jobVertexId,
             List<BlockingResultInfo> consumedResults,
-            int initialParallelism) {
+            int initialParallelism,
+            int initialMaxParallelism) {
         checkArgument(
                 initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
                         || initialParallelism > 0);
+        checkArgument(initialMaxParallelism > 0 && initialMaxParallelism >= 
initialParallelism);
 
         if (consumedResults.isEmpty()) {
             // source job vertex
+            int defaultSourceParallelism = globalDefaultSourceParallelism;
+            if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                    && initialMaxParallelism < defaultSourceParallelism) {
+                LOG.info(
+                        "The initial maximum parallelism {} is smaller than 
the global default source parallelism {}. "
+                                + "Use {} as the final default parallelism of 
source job vertex {}.",
+                        initialMaxParallelism,
+                        defaultSourceParallelism,
+                        initialMaxParallelism,
+                        jobVertexId);
+                defaultSourceParallelism = initialMaxParallelism;
+            }
+
             int parallelism =
                     initialParallelism > 0 ? initialParallelism : 
defaultSourceParallelism;
             return new ParallelismAndInputInfos(parallelism, 
Collections.emptyMap());
-        } else if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                && areAllInputsAllToAll(consumedResults)
-                && !areAllInputsBroadcast(consumedResults)) {
-            return decideParallelismAndEvenlyDistributeData(
-                    jobVertexId, consumedResults, initialParallelism);
         } else {
-            return decideParallelismAndEvenlyDistributeSubpartitions(
-                    jobVertexId, consumedResults, initialParallelism);
+            int minParallelism = globalMinParallelism;
+            int maxParallelism = globalMaxParallelism;
+
+            if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                    && initialMaxParallelism < minParallelism) {
+                LOG.info(
+                        "The initial maximum parallelism {} is smaller than 
the global minimum parallelism {}. "
+                                + "Use {} as the final minimum parallelism of 
job vertex {}.",
+                        initialMaxParallelism,
+                        minParallelism,
+                        initialMaxParallelism,
+                        jobVertexId);
+                minParallelism = initialMaxParallelism;
+            }
+            if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                    && initialMaxParallelism < maxParallelism) {
+                LOG.info(
+                        "The initial maximum parallelism {} is smaller than 
the global maximum parallelism {}. "
+                                + "Use {} as the final maximum parallelism of 
job vertex {}.",

Review Comment:
   Maybe "Use {} as the upper bound to decide parallelism of job vertex {}."
   
   Because "maximum parallelism of job vertex" may be misunderstood as 
`JobVertex#maxParallelism`, however, they are not one thing.
   
   Then, may be also change "the final minimum parallelism" to "the lower bound 
to decide parallelism"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to