zhuzhurk commented on a change in pull request #19003: URL: https://github.com/apache/flink/pull/19003#discussion_r822468908
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java ########## @@ -109,54 +116,90 @@ private int calculateParallelism(List<BlockingResultInfo> consumedResults) { + " Use {} as the size of broadcast data to decide the parallelism.", new MemorySize(broadcastBytes), new MemorySize(expectedMaxBroadcastBytes), - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(), + JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK.key(), CAP_RATIO_OF_BROADCAST, new MemorySize(expectedMaxBroadcastBytes)); broadcastBytes = expectedMaxBroadcastBytes; } - int parallelism = + int initialParallelism = (int) Math.ceil((double) nonBroadcastBytes / (dataVolumePerTask - broadcastBytes)); + int parallelism = normalizeParallelism(initialParallelism); LOG.debug( "The size of broadcast data is {}, the size of non-broadcast data is {}, " - + "the initially decided parallelism is {}.", + + "the initially decided parallelism is {}, after normalize is {}", new MemorySize(broadcastBytes), new MemorySize(nonBroadcastBytes), + initialParallelism, parallelism); if (parallelism < minParallelism) { LOG.info( - "The initially decided parallelism {} is smaller than the minimum parallelism {} " - + "(which is configured by '{}'). Use {} as the finally decided parallelism.", + "The initially normalized parallelism {} is smaller than the normalized minimum parallelism {}. " + + "Use {} as the finally decided parallelism.", parallelism, minParallelism, - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key(), minParallelism); parallelism = minParallelism; } else if (parallelism > maxParallelism) { LOG.info( - "The initially decided parallelism {} is larger than the maximum parallelism {} " - + "(which is configured by '{}'). Use {} as the finally decided parallelism.", + "The initially normalized parallelism {} is larger than the normalized maximum parallelism {}. " + + "Use {} as the finally decided parallelism.", parallelism, maxParallelism, - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.key(), maxParallelism); parallelism = maxParallelism; } return parallelism; } - public static DefaultVertexParallelismDecider from(Configuration configuration) { + @VisibleForTesting + int getMaxParallelism() { + return maxParallelism; + } + + @VisibleForTesting + int getMinParallelism() { + return minParallelism; + } + + static DefaultVertexParallelismDecider from(Configuration configuration) { + int maxParallelism = getNormalizedMaxParallelism(configuration); + int minParallelism = getNormalizedMinParallelism(configuration); + checkState( + maxParallelism >= minParallelism, + String.format( + "You should adjust '%s' and '%s' so that there is at least one power of 2 between them.", Review comment: >> You should adjust '%s' and '%s' so that there is at least one power of 2 between them. Error can also happen if max < min. Maybe "{max-parallelism} should be greater than or equal to {min-parallelism} and the range must contain at least one power of 2."? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org