This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98d3b39d2d676eecd98a095d564226ac139332e4
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon Jan 20 14:49:08 2020 +0100

    [FLINK-15690][core] In environments, call configure() in constructors with 
passed Configuration
    
    This change means it is possible to instantiate ExecutionEnvironment &
    StreamExecutionEnvironment and apply a Configuration. Effectively this
    enables configuring ExecutionConfig, CheckpointConfig and parameters
    from environment via flink-conf.yaml.
    
    This closes #10925
---
 .../flink/client/program/ContextEnvironment.java   |  6 ----
 .../apache/flink/api/common/ExecutionConfig.java   |  2 +-
 .../flink/api/common/cache/DistributedCache.java   | 31 ++++++++++++++++++
 .../flink/configuration/PipelineOptions.java       |  2 ++
 .../flink/api/java/ExecutionEnvironment.java       | 37 ++++++++++++++++++----
 .../test_execution_environment_completeness.py     |  2 +-
 .../runtime/webmonitor/WebFrontendITCase.java      |  2 +-
 .../flink/api/scala/ExecutionEnvironment.scala     | 19 ++++++++++-
 .../environment/StreamExecutionEnvironment.java    | 32 ++++---------------
 ...treamExecutionEnvironmentConfigurationTest.java |  2 +-
 10 files changed, 92 insertions(+), 43 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index fa6a397..824f4fb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
@@ -47,11 +46,6 @@ public class ContextEnvironment extends ExecutionEnvironment 
{
                        final Configuration configuration,
                        final ClassLoader userCodeClassLoader) {
                super(executorServiceLoader, configuration, 
userCodeClassLoader);
-
-               final int parallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-               if (parallelism > 0) {
-                       setParallelism(parallelism);
-               }
        }
 
        @Override
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 969ea22..291bced 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -99,7 +99,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
 
        private ClosureCleanerLevel closureCleanerLevel = 
ClosureCleanerLevel.RECURSIVE;
 
-       private int parallelism = PARALLELISM_DEFAULT;
+       private int parallelism = 
CoreOptions.DEFAULT_PARALLELISM.defaultValue();
 
        /**
         * The program wide maximum parallelism used for operators which 
haven't specified a maximum
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
index 3189159..079cfab 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.cache;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 
@@ -28,12 +29,15 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 /**
  * DistributedCache provides static methods to write the registered cache 
files into job configuration or decode
@@ -183,6 +187,33 @@ public class DistributedCache {
                return cacheFiles.entrySet();
        }
 
+       /**
+        * Parses a list of distributed cache entries encoded in a string. Can 
be used to parse a config option
+        * described by {@link 
org.apache.flink.configuration.PipelineOptions#CACHED_FILES}.
+        *
+        * <p>See {@link 
org.apache.flink.configuration.PipelineOptions#CACHED_FILES} for the format.
+        *
+        * @param files List of string encoded distributed cache entries.
+        */
+       public static List<Tuple2<String, DistributedCacheEntry>> 
parseCachedFilesFromString(List<String> files) {
+               return files.stream()
+                       .map(v -> Arrays.stream(v.split(","))
+                               .map(p -> p.split(":"))
+                               .collect(
+                                       Collectors.toMap(
+                                               arr -> arr[0], // key name
+                                               arr -> arr[1] // value
+                                       )
+                               )
+                       )
+                       .map(m -> Tuple2.of(
+                               m.get("name"),
+                               new DistributedCacheEntry(
+                                       m.get("path"),
+                                       
Optional.ofNullable(m.get("executable")).map(Boolean::parseBoolean).orElse(false)))
+                       ).collect(Collectors.toList());
+       }
+
        private static final String CACHE_FILE_NUM = 
"DISTRIBUTED_CACHE_FILE_NUM";
 
        private static final String CACHE_FILE_NAME = 
"DISTRIBUTED_CACHE_FILE_NAME_";
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index df394f5..694470d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -225,5 +225,7 @@ public class PipelineOptions {
                                        "accessible from any user-defined 
function in the (distributed) runtime under a local path. " +
                                        "Files may be local files (which will 
be distributed via BlobServer), or files in a distributed " +
                                        "file system. The runtime will copy the 
files temporarily to a local cache, if needed.")
+                               .linebreak()
+                               
.add(TextElement.code("name:file1,path:`file:///tmp/file1`;name:file2,path:`hdfs:///tmp/file2`"))
                                .build());
 }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 20f942f..3b19bca 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.cache.DistributedCache;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
@@ -50,8 +51,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
@@ -166,15 +168,16 @@ public class ExecutionEnvironment {
                this.configuration = checkNotNull(configuration);
                this.userClassloader = userClassloader == null ? 
getClass().getClassLoader() : userClassloader;
 
-               // the parallelism of a job or an operator can only be 
specified at the following places:
-               //     i) at the operator level using the 
SingleOutputStreamOperator.setParallelism().
-               //     ii) programmatically by using the env.setParallelism() 
method
+               // the configuration of a job or an operator can be specified 
at the following places:
+               //     i) at the operator level using e.g. parallelism using 
the SingleOutputStreamOperator.setParallelism().
+               //     ii) programmatically by using e.g. the 
env.setRestartStrategy() method
+               //     iii) in the configuration passed here
                //
                // if specified in multiple places, the priority order is the 
above.
                //
-               // Given this, it is safe to overwrite the execution config 
default value here because all other ways assume
+               // Given this, it is safe to overwrite the execution config 
default values here because all other ways assume
                // that the env is already instantiated so they will overwrite 
the value passed here.
-               
this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+               this.configure(this.configuration, this.userClassloader);
        }
 
        /**
@@ -379,6 +382,28 @@ public class ExecutionEnvironment {
                }
        }
 
+       /**
+        * Sets all relevant options contained in the {@link ReadableConfig} 
such as e.g.
+        * {@link PipelineOptions#CACHED_FILES}. It will reconfigure
+        * {@link ExecutionEnvironment} and {@link ExecutionConfig}.
+        *
+        * <p>It will change the value of a setting only if a corresponding 
option was set in the
+        * {@code configuration}. If a key is not present, the current value of 
a field will remain
+        * untouched.
+        *
+        * @param configuration a configuration to read the values from
+        * @param classLoader a class loader to use when loading classes
+        */
+       @PublicEvolving
+       public void configure(ReadableConfig configuration, ClassLoader 
classLoader) {
+               configuration.getOptional(PipelineOptions.CACHED_FILES)
+                       .ifPresent(f -> {
+                               this.cacheFile.clear();
+                               
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
+                       });
+               config.configure(configuration, classLoader);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Data set creations
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py 
b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
index 02f83fd6..c1c94c8 100644
--- 
a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
+++ 
b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
@@ -51,7 +51,7 @@ class 
ExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'createCollectionsEnvironment', 'readFile', 
'readFileOfPrimitives',
                 'generateSequence', 'areExplicitEnvironmentsAllowed', 
'createInput',
                 'getUserCodeClassLoader', 'getExecutorServiceLoader', 
'getConfiguration',
-                'executeAsync', 'registerJobListener', 'clearJobListeners'}
+                'executeAsync', 'registerJobListener', 'clearJobListeners', 
'configure'}
 
 
 if __name__ == '__main__':
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index f25aeb1..3df9517 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -287,7 +287,7 @@ public class WebFrontendITCase extends TestLogger {
                        assertEquals("application/json; charset=UTF-8", 
response.getType());
                        assertEquals("{\"jid\":\"" + jid + 
"\",\"name\":\"Stoppable streaming test job\"," +
                                
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster
 level default restart strategy\"," +
-                               
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", 
response.getContent());
+                               
"\"job-parallelism\":1,\"object-reuse-mode\":false,\"user-config\":{}}}", 
response.getContent());
                }
 
                BlockingInvokable.reset();
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 15585b3..4d14d9b 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, 
ValueTypeInfo}
 import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment 
=> JavaEnv}
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, ReadableConfig}
 import org.apache.flink.core.execution.{JobClient, JobListener, 
PipelineExecutor}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.types.StringValue
@@ -193,6 +193,23 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Sets all relevant options contained in the [[ReadableConfig]] such as e.g.
+   * [[org.apache.flink.configuration.PipelineOptions#CACHED_FILES]]. It will 
reconfigure
+   * [[ExecutionEnvironment]] and [[ExecutionConfig]].
+   *
+   * It will change the value of a setting only if a corresponding option was 
set in the
+   * `configuration`. If a key is not present, the current value of a field 
will remain
+   * untouched.
+   *
+   * @param configuration a configuration to read the values from
+   * @param classLoader   a class loader to use when loading classes
+   */
+  @PublicEvolving
+  def configure(configuration: ReadableConfig, classLoader: ClassLoader): Unit 
= {
+    javaEnv.configure(configuration, classLoader)
+  }
+
+  /**
    * Creates a DataSet of Strings produced by reading the given file line wise.
    *
    * @param filePath The path of the file, as a URI (e.g., 
"file:///some/local/file" or
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 49a8fe9..e16a4da 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -46,7 +46,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.PipelineOptions;
@@ -104,7 +103,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -208,16 +206,16 @@ public class StreamExecutionEnvironment {
                this.configuration = checkNotNull(configuration);
                this.userClassloader = userClassloader == null ? 
getClass().getClassLoader() : userClassloader;
 
-               // the parallelism of a job or an operator can only be 
specified at the following places:
-               //     i) at the operator level using the 
SingleOutputStreamOperator.setParallelism().
-               //     ii) programmatically by using the env.setParallelism() 
method, or
+               // the configuration of a job or an operator can be specified 
at the following places:
+               //     i) at the operator level using e.g. parallelism using 
the SingleOutputStreamOperator.setParallelism().
+               //     ii) programmatically by using e.g. the 
env.setRestartStrategy() method
                //     iii) in the configuration passed here
                //
                // if specified in multiple places, the priority order is the 
above.
                //
-               // Given this, it is safe to overwrite the execution config 
default value here because all other ways assume
+               // Given this, it is safe to overwrite the execution config 
default values here because all other ways assume
                // that the env is already instantiated so they will overwrite 
the value passed here.
-               
this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+               this.configure(this.configuration, this.userClassloader);
        }
 
        protected Configuration getConfiguration() {
@@ -758,7 +756,7 @@ public class StreamExecutionEnvironment {
                configuration.getOptional(PipelineOptions.CACHED_FILES)
                        .ifPresent(f -> {
                                this.cacheFile.clear();
-                               parseCachedFiles(f).forEach(t -> 
registerCachedFile(t.f1, t.f0, t.f2));
+                               
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
                        });
                config.configure(configuration, classLoader);
                checkpointCfg.configure(configuration);
@@ -775,24 +773,6 @@ public class StreamExecutionEnvironment {
                }
        }
 
-       private List<Tuple3<String, String, Boolean>> 
parseCachedFiles(List<String> s) {
-               return s.stream()
-                       .map(v -> Arrays.stream(v.split(","))
-                               .map(p -> p.split(":"))
-                               .collect(
-                                       Collectors.toMap(
-                                               arr -> arr[0], // key name
-                                               arr -> arr[1] // value
-                                       )
-                               )
-                       )
-                       .map(m -> Tuple3.of(
-                               m.get("name"),
-                               m.get("path"),
-                               
Optional.ofNullable(m.get("executable")).map(Boolean::parseBoolean).orElse(false)))
-                       .collect(Collectors.toList());
-       }
-
        // 
--------------------------------------------------------------------------------------------
        // Data stream creations
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentConfigurationTest.java
index 0885c2f..ac9ddac 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentConfigurationTest.java
@@ -64,7 +64,7 @@ public class StreamExecutionEnvironmentConfigurationTest {
                                .whenSetFromFile("pipeline.operator-chaining", 
"false")
                                .viaSetter((env, b) -> {
                                        if (b) {
-                                               throw new 
IllegalArgumentException("Cannot programatically enable operator chaining");
+                                               throw new 
IllegalArgumentException("Cannot programmatically enable operator chaining");
                                        } else {
                                                env.disableOperatorChaining();
                                        }

Reply via email to