zhuzhurk commented on code in PR #21861: URL: https://github.com/apache/flink/pull/21861#discussion_r1101058296
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java: ########## @@ -75,54 +75,106 @@ */ private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768; - private final int maxParallelism; - private final int minParallelism; + private final int globalMaxParallelism; + private final int globalMinParallelism; private final long dataVolumePerTask; - private final int defaultSourceParallelism; + private final int globalDefaultSourceParallelism; private DefaultVertexParallelismAndInputInfosDecider( - int maxParallelism, - int minParallelism, + int globalMaxParallelism, + int globalMinParallelism, MemorySize dataVolumePerTask, - int defaultSourceParallelism) { + int globalDefaultSourceParallelism) { - checkArgument(minParallelism > 0, "The minimum parallelism must be larger than 0."); + checkArgument(globalMinParallelism > 0, "The minimum parallelism must be larger than 0."); checkArgument( - maxParallelism >= minParallelism, + globalMaxParallelism >= globalMinParallelism, "Maximum parallelism should be greater than or equal to the minimum parallelism."); checkArgument( - defaultSourceParallelism > 0, + globalDefaultSourceParallelism > 0, "The default source parallelism must be larger than 0."); checkNotNull(dataVolumePerTask); - this.maxParallelism = maxParallelism; - this.minParallelism = minParallelism; + this.globalMaxParallelism = globalMaxParallelism; + this.globalMinParallelism = globalMinParallelism; this.dataVolumePerTask = dataVolumePerTask.getBytes(); - this.defaultSourceParallelism = defaultSourceParallelism; + this.globalDefaultSourceParallelism = globalDefaultSourceParallelism; } @Override public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( JobVertexID jobVertexId, List<BlockingResultInfo> consumedResults, - int initialParallelism) { + int initialParallelism, + int initialMaxParallelism) { checkArgument( initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT || initialParallelism > 0); + checkArgument(initialMaxParallelism > 0 && initialMaxParallelism >= initialParallelism); if (consumedResults.isEmpty()) { // source job vertex + int defaultSourceParallelism = globalDefaultSourceParallelism; + if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && initialMaxParallelism < defaultSourceParallelism) { + LOG.info( + "The initial maximum parallelism {} is smaller than the global default source parallelism {}. " + + "Use {} as the final default parallelism of source job vertex {}.", + initialMaxParallelism, + defaultSourceParallelism, + initialMaxParallelism, + jobVertexId); + defaultSourceParallelism = initialMaxParallelism; + } + int parallelism = initialParallelism > 0 ? initialParallelism : defaultSourceParallelism; Review Comment: It's better to skip computing the `defaultSourceParallelism` if `initialParallelism > 0`. Maybe introduce a method "computeSourceParallelism(jobVertexId, maxParallelism)"? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismAndInputInfosDecider.java: ########## @@ -41,10 +41,12 @@ public interface VertexParallelismAndInputInfosDecider { * number, it will be respected. If it's not set(equals to {@link * ExecutionConfig#PARALLELISM_DEFAULT}), a parallelism will be automatically decided for * the vertex. + * @param initialMaxParallelism The initial max parallelism of the job vertex. * @return the parallelism and vertex input infos. */ ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( JobVertexID jobVertexId, List<BlockingResultInfo> consumedResults, - int initialParallelism); + int initialParallelism, + int initialMaxParallelism); Review Comment: Maybe name them as `vertexInitialParallelism` and `vertexMaxParallelism`? I feel it can make the code easier to understand. Also, `initialMaxParallelism` indicates that the value is not finalized, which is not the truth. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java: ########## @@ -75,54 +75,106 @@ */ private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768; - private final int maxParallelism; - private final int minParallelism; + private final int globalMaxParallelism; + private final int globalMinParallelism; private final long dataVolumePerTask; - private final int defaultSourceParallelism; + private final int globalDefaultSourceParallelism; private DefaultVertexParallelismAndInputInfosDecider( - int maxParallelism, - int minParallelism, + int globalMaxParallelism, + int globalMinParallelism, MemorySize dataVolumePerTask, - int defaultSourceParallelism) { + int globalDefaultSourceParallelism) { - checkArgument(minParallelism > 0, "The minimum parallelism must be larger than 0."); + checkArgument(globalMinParallelism > 0, "The minimum parallelism must be larger than 0."); checkArgument( - maxParallelism >= minParallelism, + globalMaxParallelism >= globalMinParallelism, "Maximum parallelism should be greater than or equal to the minimum parallelism."); checkArgument( - defaultSourceParallelism > 0, + globalDefaultSourceParallelism > 0, "The default source parallelism must be larger than 0."); checkNotNull(dataVolumePerTask); - this.maxParallelism = maxParallelism; - this.minParallelism = minParallelism; + this.globalMaxParallelism = globalMaxParallelism; + this.globalMinParallelism = globalMinParallelism; this.dataVolumePerTask = dataVolumePerTask.getBytes(); - this.defaultSourceParallelism = defaultSourceParallelism; + this.globalDefaultSourceParallelism = globalDefaultSourceParallelism; } @Override public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( JobVertexID jobVertexId, List<BlockingResultInfo> consumedResults, - int initialParallelism) { + int initialParallelism, + int initialMaxParallelism) { checkArgument( initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT || initialParallelism > 0); + checkArgument(initialMaxParallelism > 0 && initialMaxParallelism >= initialParallelism); if (consumedResults.isEmpty()) { // source job vertex + int defaultSourceParallelism = globalDefaultSourceParallelism; + if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && initialMaxParallelism < defaultSourceParallelism) { + LOG.info( + "The initial maximum parallelism {} is smaller than the global default source parallelism {}. " + + "Use {} as the final default parallelism of source job vertex {}.", Review Comment: I think it's better to state it as "The global default source parallelism {} is larger than the maximum parallelism {}. Similarly, I feel `defaultSourceParallelism > initialMaxParallelism`(or `sourceParallelism > maxParallelism`) would be better, though there is no strict requirement for this. And "Use {} as the parallelism of source job vertex {}." considering my another comment. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java: ########## @@ -75,54 +75,106 @@ */ private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768; - private final int maxParallelism; - private final int minParallelism; + private final int globalMaxParallelism; + private final int globalMinParallelism; private final long dataVolumePerTask; - private final int defaultSourceParallelism; + private final int globalDefaultSourceParallelism; private DefaultVertexParallelismAndInputInfosDecider( - int maxParallelism, - int minParallelism, + int globalMaxParallelism, + int globalMinParallelism, MemorySize dataVolumePerTask, - int defaultSourceParallelism) { + int globalDefaultSourceParallelism) { - checkArgument(minParallelism > 0, "The minimum parallelism must be larger than 0."); + checkArgument(globalMinParallelism > 0, "The minimum parallelism must be larger than 0."); checkArgument( - maxParallelism >= minParallelism, + globalMaxParallelism >= globalMinParallelism, "Maximum parallelism should be greater than or equal to the minimum parallelism."); checkArgument( - defaultSourceParallelism > 0, + globalDefaultSourceParallelism > 0, "The default source parallelism must be larger than 0."); checkNotNull(dataVolumePerTask); - this.maxParallelism = maxParallelism; - this.minParallelism = minParallelism; + this.globalMaxParallelism = globalMaxParallelism; + this.globalMinParallelism = globalMinParallelism; this.dataVolumePerTask = dataVolumePerTask.getBytes(); - this.defaultSourceParallelism = defaultSourceParallelism; + this.globalDefaultSourceParallelism = globalDefaultSourceParallelism; } @Override public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( JobVertexID jobVertexId, List<BlockingResultInfo> consumedResults, - int initialParallelism) { + int initialParallelism, + int initialMaxParallelism) { checkArgument( initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT || initialParallelism > 0); + checkArgument(initialMaxParallelism > 0 && initialMaxParallelism >= initialParallelism); if (consumedResults.isEmpty()) { // source job vertex + int defaultSourceParallelism = globalDefaultSourceParallelism; + if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && initialMaxParallelism < defaultSourceParallelism) { + LOG.info( + "The initial maximum parallelism {} is smaller than the global default source parallelism {}. " + + "Use {} as the final default parallelism of source job vertex {}.", + initialMaxParallelism, + defaultSourceParallelism, + initialMaxParallelism, + jobVertexId); + defaultSourceParallelism = initialMaxParallelism; + } + int parallelism = initialParallelism > 0 ? initialParallelism : defaultSourceParallelism; return new ParallelismAndInputInfos(parallelism, Collections.emptyMap()); - } else if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && areAllInputsAllToAll(consumedResults) - && !areAllInputsBroadcast(consumedResults)) { - return decideParallelismAndEvenlyDistributeData( - jobVertexId, consumedResults, initialParallelism); } else { - return decideParallelismAndEvenlyDistributeSubpartitions( - jobVertexId, consumedResults, initialParallelism); + int minParallelism = globalMinParallelism; + int maxParallelism = globalMaxParallelism; + + if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && initialMaxParallelism < minParallelism) { + LOG.info( + "The initial maximum parallelism {} is smaller than the global minimum parallelism {}. " + + "Use {} as the final minimum parallelism of job vertex {}.", + initialMaxParallelism, + minParallelism, + initialMaxParallelism, + jobVertexId); + minParallelism = initialMaxParallelism; + } + if (initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && initialMaxParallelism < maxParallelism) { + LOG.info( + "The initial maximum parallelism {} is smaller than the global maximum parallelism {}. " + + "Use {} as the final maximum parallelism of job vertex {}.", Review Comment: Maybe "Use {} as the upper bound to decide parallelism of job vertex {}." Because "maximum parallelism of job vertex" may be misunderstood as `JobVertex#maxParallelism`, however, they are not one thing. Then, may be also change "the final minimum parallelism" to "the lower bound to decide parallelism" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org