This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2ecd1aeb493a6ddd8cea4dc983dc655085f486c3 Author: Kostas Kloudas <[email protected]> AuthorDate: Thu Nov 14 15:41:57 2019 +0100 [hotfix] Fix parallelism consolidation logic in the (Stream)ExecutionEnvironment. --- .../java/org/apache/flink/api/java/ExecutionEnvironment.java | 10 ++-------- .../streaming/api/environment/StreamExecutionEnvironment.java | 10 ++-------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 1c6fca1..5b07843 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -801,15 +801,9 @@ public class ExecutionEnvironment { } private void consolidateParallelismDefinitionsInConfiguration() { - final int execParallelism = getParallelism(); - if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) { - return; + if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) { + configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism); } - - // if parallelism is set in the ExecutorConfig, then - // that value takes precedence over any other value. - - configuration.set(CoreOptions.DEFAULT_PARALLELISM, execParallelism); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 721869c..3870b52 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1556,15 +1556,9 @@ public class StreamExecutionEnvironment { } private void consolidateParallelismDefinitionsInConfiguration() { - final int execParallelism = getParallelism(); - if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) { - return; + if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) { + configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism); } - - // if parallelism is set in the ExecutorConfig, then - // that value takes precedence over any other value. - - configuration.set(CoreOptions.DEFAULT_PARALLELISM, execParallelism); } /**
