This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ecd1aeb493a6ddd8cea4dc983dc655085f486c3
Author: Kostas Kloudas <[email protected]>
AuthorDate: Thu Nov 14 15:41:57 2019 +0100

    [hotfix] Fix parallelism consolidation logic in the 
(Stream)ExecutionEnvironment.
---
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java   | 10 ++--------
 .../streaming/api/environment/StreamExecutionEnvironment.java  | 10 ++--------
 2 files changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1c6fca1..5b07843 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -801,15 +801,9 @@ public class ExecutionEnvironment {
        }
 
        private void consolidateParallelismDefinitionsInConfiguration() {
-               final int execParallelism = getParallelism();
-               if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-                       return;
+               if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+                       
configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
                }
-
-               // if parallelism is set in the ExecutorConfig, then
-               // that value takes precedence over any other value.
-
-               configuration.set(CoreOptions.DEFAULT_PARALLELISM, 
execParallelism);
        }
 
        /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 721869c..3870b52 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1556,15 +1556,9 @@ public class StreamExecutionEnvironment {
        }
 
        private void consolidateParallelismDefinitionsInConfiguration() {
-               final int execParallelism = getParallelism();
-               if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-                       return;
+               if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+                       
configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
                }
-
-               // if parallelism is set in the ExecutorConfig, then
-               // that value takes precedence over any other value.
-
-               configuration.set(CoreOptions.DEFAULT_PARALLELISM, 
execParallelism);
        }
 
        /**

Reply via email to