[GitHub] [flink] zhuzhurk commented on a change in pull request #19003: [FLINK-26517][runtime] Normalize the decided parallelism to power of 2 when using adaptive batch scheduler

2022-03-09 Thread GitBox


zhuzhurk commented on a change in pull request #19003:
URL: https://github.com/apache/flink/pull/19003#discussion_r822468908



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##
@@ -109,54 +116,90 @@ private int calculateParallelism(List 
consumedResults) {
 + " Use {} as the size of broadcast data to decide 
the parallelism.",
 new MemorySize(broadcastBytes),
 new MemorySize(expectedMaxBroadcastBytes),
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK.key(),
 CAP_RATIO_OF_BROADCAST,
 new MemorySize(expectedMaxBroadcastBytes));
 
 broadcastBytes = expectedMaxBroadcastBytes;
 }
 
-int parallelism =
+int initialParallelism =
 (int) Math.ceil((double) nonBroadcastBytes / 
(dataVolumePerTask - broadcastBytes));
+int parallelism = normalizeParallelism(initialParallelism);
 
 LOG.debug(
 "The size of broadcast data is {}, the size of non-broadcast 
data is {}, "
-+ "the initially decided parallelism is {}.",
++ "the initially decided parallelism is {}, after 
normalize is {}",
 new MemorySize(broadcastBytes),
 new MemorySize(nonBroadcastBytes),
+initialParallelism,
 parallelism);
 
 if (parallelism < minParallelism) {
 LOG.info(
-"The initially decided parallelism {} is smaller than the 
minimum parallelism {} "
-+ "(which is configured by '{}'). Use {} as the 
finally decided parallelism.",
+"The initially normalized parallelism {} is smaller than 
the normalized minimum parallelism {}. "
++ "Use {} as the finally decided parallelism.",
 parallelism,
 minParallelism,
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key(),
 minParallelism);
 parallelism = minParallelism;
 } else if (parallelism > maxParallelism) {
 LOG.info(
-"The initially decided parallelism {} is larger than the 
maximum parallelism {} "
-+ "(which is configured by '{}'). Use {} as the 
finally decided parallelism.",
+"The initially normalized parallelism {} is larger than 
the normalized maximum parallelism {}. "
++ "Use {} as the finally decided parallelism.",
 parallelism,
 maxParallelism,
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.key(),
 maxParallelism);
 parallelism = maxParallelism;
 }
 
 return parallelism;
 }
 
-public static DefaultVertexParallelismDecider from(Configuration 
configuration) {
+@VisibleForTesting
+int getMaxParallelism() {
+return maxParallelism;
+}
+
+@VisibleForTesting
+int getMinParallelism() {
+return minParallelism;
+}
+
+static DefaultVertexParallelismDecider from(Configuration configuration) {
+int maxParallelism = getNormalizedMaxParallelism(configuration);
+int minParallelism = getNormalizedMinParallelism(configuration);
+checkState(
+maxParallelism >= minParallelism,
+String.format(
+"You should adjust '%s' and '%s' so that there is at 
least one power of 2 between them.",

Review comment:
   > You should adjust '%s' and '%s' so that there is at least one power of 
2 between them.
   
   Error can also happen if max < min.
   Maybe "Invalid configuration: {max-parallelism} should be greater than or 
equal to {min-parallelism} and the range must contain at least one power of 2."?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #19003: [FLINK-26517][runtime] Normalize the decided parallelism to power of 2 when using adaptive batch scheduler

2022-03-09 Thread GitBox


zhuzhurk commented on a change in pull request #19003:
URL: https://github.com/apache/flink/pull/19003#discussion_r822468908



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##
@@ -109,54 +116,90 @@ private int calculateParallelism(List 
consumedResults) {
 + " Use {} as the size of broadcast data to decide 
the parallelism.",
 new MemorySize(broadcastBytes),
 new MemorySize(expectedMaxBroadcastBytes),
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK.key(),
 CAP_RATIO_OF_BROADCAST,
 new MemorySize(expectedMaxBroadcastBytes));
 
 broadcastBytes = expectedMaxBroadcastBytes;
 }
 
-int parallelism =
+int initialParallelism =
 (int) Math.ceil((double) nonBroadcastBytes / 
(dataVolumePerTask - broadcastBytes));
+int parallelism = normalizeParallelism(initialParallelism);
 
 LOG.debug(
 "The size of broadcast data is {}, the size of non-broadcast 
data is {}, "
-+ "the initially decided parallelism is {}.",
++ "the initially decided parallelism is {}, after 
normalize is {}",
 new MemorySize(broadcastBytes),
 new MemorySize(nonBroadcastBytes),
+initialParallelism,
 parallelism);
 
 if (parallelism < minParallelism) {
 LOG.info(
-"The initially decided parallelism {} is smaller than the 
minimum parallelism {} "
-+ "(which is configured by '{}'). Use {} as the 
finally decided parallelism.",
+"The initially normalized parallelism {} is smaller than 
the normalized minimum parallelism {}. "
++ "Use {} as the finally decided parallelism.",
 parallelism,
 minParallelism,
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key(),
 minParallelism);
 parallelism = minParallelism;
 } else if (parallelism > maxParallelism) {
 LOG.info(
-"The initially decided parallelism {} is larger than the 
maximum parallelism {} "
-+ "(which is configured by '{}'). Use {} as the 
finally decided parallelism.",
+"The initially normalized parallelism {} is larger than 
the normalized maximum parallelism {}. "
++ "Use {} as the finally decided parallelism.",
 parallelism,
 maxParallelism,
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.key(),
 maxParallelism);
 parallelism = maxParallelism;
 }
 
 return parallelism;
 }
 
-public static DefaultVertexParallelismDecider from(Configuration 
configuration) {
+@VisibleForTesting
+int getMaxParallelism() {
+return maxParallelism;
+}
+
+@VisibleForTesting
+int getMinParallelism() {
+return minParallelism;
+}
+
+static DefaultVertexParallelismDecider from(Configuration configuration) {
+int maxParallelism = getNormalizedMaxParallelism(configuration);
+int minParallelism = getNormalizedMinParallelism(configuration);
+checkState(
+maxParallelism >= minParallelism,
+String.format(
+"You should adjust '%s' and '%s' so that there is at 
least one power of 2 between them.",

Review comment:
   >> You should adjust '%s' and '%s' so that there is at least one power 
of 2 between them.
   
   Error can also happen if max < min.
   Maybe "Invalid configuration: {max-parallelism} should be greater than or 
equal to {min-parallelism} and the range must contain at least one power of 2."?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #19003: [FLINK-26517][runtime] Normalize the decided parallelism to power of 2 when using adaptive batch scheduler

2022-03-09 Thread GitBox


zhuzhurk commented on a change in pull request #19003:
URL: https://github.com/apache/flink/pull/19003#discussion_r822468908



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##
@@ -109,54 +116,90 @@ private int calculateParallelism(List 
consumedResults) {
 + " Use {} as the size of broadcast data to decide 
the parallelism.",
 new MemorySize(broadcastBytes),
 new MemorySize(expectedMaxBroadcastBytes),
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK.key(),
 CAP_RATIO_OF_BROADCAST,
 new MemorySize(expectedMaxBroadcastBytes));
 
 broadcastBytes = expectedMaxBroadcastBytes;
 }
 
-int parallelism =
+int initialParallelism =
 (int) Math.ceil((double) nonBroadcastBytes / 
(dataVolumePerTask - broadcastBytes));
+int parallelism = normalizeParallelism(initialParallelism);
 
 LOG.debug(
 "The size of broadcast data is {}, the size of non-broadcast 
data is {}, "
-+ "the initially decided parallelism is {}.",
++ "the initially decided parallelism is {}, after 
normalize is {}",
 new MemorySize(broadcastBytes),
 new MemorySize(nonBroadcastBytes),
+initialParallelism,
 parallelism);
 
 if (parallelism < minParallelism) {
 LOG.info(
-"The initially decided parallelism {} is smaller than the 
minimum parallelism {} "
-+ "(which is configured by '{}'). Use {} as the 
finally decided parallelism.",
+"The initially normalized parallelism {} is smaller than 
the normalized minimum parallelism {}. "
++ "Use {} as the finally decided parallelism.",
 parallelism,
 minParallelism,
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key(),
 minParallelism);
 parallelism = minParallelism;
 } else if (parallelism > maxParallelism) {
 LOG.info(
-"The initially decided parallelism {} is larger than the 
maximum parallelism {} "
-+ "(which is configured by '{}'). Use {} as the 
finally decided parallelism.",
+"The initially normalized parallelism {} is larger than 
the normalized maximum parallelism {}. "
++ "Use {} as the finally decided parallelism.",
 parallelism,
 maxParallelism,
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.key(),
 maxParallelism);
 parallelism = maxParallelism;
 }
 
 return parallelism;
 }
 
-public static DefaultVertexParallelismDecider from(Configuration 
configuration) {
+@VisibleForTesting
+int getMaxParallelism() {
+return maxParallelism;
+}
+
+@VisibleForTesting
+int getMinParallelism() {
+return minParallelism;
+}
+
+static DefaultVertexParallelismDecider from(Configuration configuration) {
+int maxParallelism = getNormalizedMaxParallelism(configuration);
+int minParallelism = getNormalizedMinParallelism(configuration);
+checkState(
+maxParallelism >= minParallelism,
+String.format(
+"You should adjust '%s' and '%s' so that there is at 
least one power of 2 between them.",

Review comment:
   >> You should adjust '%s' and '%s' so that there is at least one power 
of 2 between them.
   
   Error can also happen if max < min.
   Maybe "{max-parallelism} should be greater than or equal to 
{min-parallelism} and the range must contain at least one power of 2."?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #19003: [FLINK-26517][runtime] Normalize the decided parallelism to power of 2 when using adaptive batch scheduler

2022-03-08 Thread GitBox


zhuzhurk commented on a change in pull request #19003:
URL: https://github.com/apache/flink/pull/19003#discussion_r821468782



##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##
@@ -531,7 +531,9 @@
 .withDescription(
 Description.builder()
 .text(
-"The lower bound of allowed 
parallelism to set adaptively if %s has been set to %s",
+"The lower bound of allowed 
parallelism to set adaptively if %s has been set to %s. "
++ "Currently, this option 
should be configured as a power of 2, "
++ "otherwise it will also 
be rounded up to a power of 2 by framework.",

Review comment:
   maybe: by framework -> automatically

##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##
@@ -556,14 +560,16 @@
 Documentation.Sections.EXPERT_SCHEDULING,
 Documentation.Sections.ALL_JOB_MANAGER
 })
-public static final ConfigOption 
ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK =
-key("jobmanager.adaptive-batch-scheduler.data-volume-per-task")
+public static final ConfigOption 
ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK =
+key("jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task")
 .memoryType()
 .defaultValue(MemorySize.ofMebiBytes(1024))
 .withDescription(
 Description.builder()
 .text(
-"The size of data volume to expect 
each task instance to process if %s has been set to %s",
+"The average size of data volume 
to expect each task instance to process if %s has been set to %s. "

Review comment:
   maybe also note that the actually processed data of some tasks may be 
much larger if there is data skew, or if the data is too large while the 
parallelism has reached the upper bound?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##
@@ -109,54 +112,85 @@ private int calculateParallelism(List 
consumedResults) {
 + " Use {} as the size of broadcast data to decide 
the parallelism.",
 new MemorySize(broadcastBytes),
 new MemorySize(expectedMaxBroadcastBytes),
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK.key(),
 CAP_RATIO_OF_BROADCAST,
 new MemorySize(expectedMaxBroadcastBytes));
 
 broadcastBytes = expectedMaxBroadcastBytes;
 }
 
-int parallelism =
+int initialParallelism =
 (int) Math.ceil((double) nonBroadcastBytes / 
(dataVolumePerTask - broadcastBytes));
+int parallelism = normalizeParallelism(initialParallelism);
 
 LOG.debug(
 "The size of broadcast data is {}, the size of non-broadcast 
data is {}, "
-+ "the initially decided parallelism is {}.",
++ "the initially decided parallelism is {}, after 
normalize is {}",
 new MemorySize(broadcastBytes),
 new MemorySize(nonBroadcastBytes),
+initialParallelism,
 parallelism);
 
 if (parallelism < minParallelism) {
 LOG.info(
-"The initially decided parallelism {} is smaller than the 
minimum parallelism {} "
-+ "(which is configured by '{}'). Use {} as the 
finally decided parallelism.",
+"The initially normalized parallelism {} is smaller than 
the normalized minimum parallelism {}. "
++ "Use {} as the finally decided parallelism.",
 parallelism,
 minParallelism,
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key(),
 minParallelism);
 parallelism = minParallelism;
 } else if (parallelism > maxParallelism) {
 LOG.info(
-"The initially decided parallelism {} is larger than the 
maximum parallelism {} "
-+ "(which is configured by '{}'). Use {} as the 
finally decided parallelism.",
+"The initially normalized parallelism {} is larger than 
the normalized maximum parallelism {}. "
++ "Use {} as the finally decided parallelism.",