This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 98d3b39d2d676eecd98a095d564226ac139332e4 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon Jan 20 14:49:08 2020 +0100 [FLINK-15690][core] In environments, call configure() in constructors with passed Configuration This change means it is possible to instantiate ExecutionEnvironment & StreamExecutionEnvironment and apply a Configuration. Effectively this enables configuring ExecutionConfig, CheckpointConfig and parameters from environment via flink-conf.yaml. This closes #10925 --- .../flink/client/program/ContextEnvironment.java | 6 ---- .../apache/flink/api/common/ExecutionConfig.java | 2 +- .../flink/api/common/cache/DistributedCache.java | 31 ++++++++++++++++++ .../flink/configuration/PipelineOptions.java | 2 ++ .../flink/api/java/ExecutionEnvironment.java | 37 ++++++++++++++++++---- .../test_execution_environment_completeness.py | 2 +- .../runtime/webmonitor/WebFrontendITCase.java | 2 +- .../flink/api/scala/ExecutionEnvironment.scala | 19 ++++++++++- .../environment/StreamExecutionEnvironment.java | 32 ++++--------------- ...treamExecutionEnvironmentConfigurationTest.java | 2 +- 10 files changed, 92 insertions(+), 43 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index fa6a397..824f4fb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.DetachedJobExecutionResult; import org.apache.flink.core.execution.JobClient; @@ -47,11 +46,6 @@ public class ContextEnvironment extends ExecutionEnvironment { final Configuration configuration, final ClassLoader userCodeClassLoader) { super(executorServiceLoader, configuration, userCodeClassLoader); - - final int parallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); - if (parallelism > 0) { - setParallelism(parallelism); - } } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 969ea22..291bced 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -99,7 +99,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut private ClosureCleanerLevel closureCleanerLevel = ClosureCleanerLevel.RECURSIVE; - private int parallelism = PARALLELISM_DEFAULT; + private int parallelism = CoreOptions.DEFAULT_PARALLELISM.defaultValue(); /** * The program wide maximum parallelism used for operators which haven't specified a maximum diff --git a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java index 3189159..079cfab 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.cache; import org.apache.flink.annotation.Public; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -28,12 +29,15 @@ import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.stream.Collectors; /** * DistributedCache provides static methods to write the registered cache files into job configuration or decode @@ -183,6 +187,33 @@ public class DistributedCache { return cacheFiles.entrySet(); } + /** + * Parses a list of distributed cache entries encoded in a string. Can be used to parse a config option + * described by {@link org.apache.flink.configuration.PipelineOptions#CACHED_FILES}. + * + * <p>See {@link org.apache.flink.configuration.PipelineOptions#CACHED_FILES} for the format. + * + * @param files List of string encoded distributed cache entries. + */ + public static List<Tuple2<String, DistributedCacheEntry>> parseCachedFilesFromString(List<String> files) { + return files.stream() + .map(v -> Arrays.stream(v.split(",")) + .map(p -> p.split(":")) + .collect( + Collectors.toMap( + arr -> arr[0], // key name + arr -> arr[1] // value + ) + ) + ) + .map(m -> Tuple2.of( + m.get("name"), + new DistributedCacheEntry( + m.get("path"), + Optional.ofNullable(m.get("executable")).map(Boolean::parseBoolean).orElse(false))) + ).collect(Collectors.toList()); + } + private static final String CACHE_FILE_NUM = "DISTRIBUTED_CACHE_FILE_NUM"; private static final String CACHE_FILE_NAME = "DISTRIBUTED_CACHE_FILE_NAME_"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java index df394f5..694470d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java @@ -225,5 +225,7 @@ public class PipelineOptions { "accessible from any user-defined function in the (distributed) runtime under a local path. " + "Files may be local files (which will be distributed via BlobServer), or files in a distributed " + "file system. The runtime will copy the files temporarily to a local cache, if needed.") + .linebreak() + .add(TextElement.code("name:file1,path:`file:///tmp/file1`;name:file2,path:`hdfs:///tmp/file2`")) .build()); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 20f942f..3b19bca 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; @@ -50,8 +51,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.DetachedJobExecutionResult; @@ -166,15 +168,16 @@ public class ExecutionEnvironment { this.configuration = checkNotNull(configuration); this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader; - // the parallelism of a job or an operator can only be specified at the following places: - // i) at the operator level using the SingleOutputStreamOperator.setParallelism(). - // ii) programmatically by using the env.setParallelism() method + // the configuration of a job or an operator can be specified at the following places: + // i) at the operator level using e.g. parallelism using the SingleOutputStreamOperator.setParallelism(). + // ii) programmatically by using e.g. the env.setRestartStrategy() method + // iii) in the configuration passed here // // if specified in multiple places, the priority order is the above. // - // Given this, it is safe to overwrite the execution config default value here because all other ways assume + // Given this, it is safe to overwrite the execution config default values here because all other ways assume // that the env is already instantiated so they will overwrite the value passed here. - this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM)); + this.configure(this.configuration, this.userClassloader); } /** @@ -379,6 +382,28 @@ public class ExecutionEnvironment { } } + /** + * Sets all relevant options contained in the {@link ReadableConfig} such as e.g. + * {@link PipelineOptions#CACHED_FILES}. It will reconfigure + * {@link ExecutionEnvironment} and {@link ExecutionConfig}. + * + * <p>It will change the value of a setting only if a corresponding option was set in the + * {@code configuration}. If a key is not present, the current value of a field will remain + * untouched. + * + * @param configuration a configuration to read the values from + * @param classLoader a class loader to use when loading classes + */ + @PublicEvolving + public void configure(ReadableConfig configuration, ClassLoader classLoader) { + configuration.getOptional(PipelineOptions.CACHED_FILES) + .ifPresent(f -> { + this.cacheFile.clear(); + this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f)); + }); + config.configure(configuration, classLoader); + } + // -------------------------------------------------------------------------------------------- // Data set creations // -------------------------------------------------------------------------------------------- diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py index 02f83fd6..c1c94c8 100644 --- a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py +++ b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py @@ -51,7 +51,7 @@ class ExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase, 'createCollectionsEnvironment', 'readFile', 'readFileOfPrimitives', 'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput', 'getUserCodeClassLoader', 'getExecutorServiceLoader', 'getConfiguration', - 'executeAsync', 'registerJobListener', 'clearJobListeners'} + 'executeAsync', 'registerJobListener', 'clearJobListeners', 'configure'} if __name__ == '__main__': diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index f25aeb1..3df9517 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -287,7 +287,7 @@ public class WebFrontendITCase extends TestLogger { assertEquals("application/json; charset=UTF-8", response.getType()); assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," + "\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster level default restart strategy\"," + - "\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent()); + "\"job-parallelism\":1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent()); } BlockingInvokable.reset(); diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 15585b3..4d14d9b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.DataSource import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo} import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv} -import org.apache.flink.configuration.Configuration +import org.apache.flink.configuration.{Configuration, ReadableConfig} import org.apache.flink.core.execution.{JobClient, JobListener, PipelineExecutor} import org.apache.flink.core.fs.Path import org.apache.flink.types.StringValue @@ -193,6 +193,23 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { } /** + * Sets all relevant options contained in the [[ReadableConfig]] such as e.g. + * [[org.apache.flink.configuration.PipelineOptions#CACHED_FILES]]. It will reconfigure + * [[ExecutionEnvironment]] and [[ExecutionConfig]]. + * + * It will change the value of a setting only if a corresponding option was set in the + * `configuration`. If a key is not present, the current value of a field will remain + * untouched. + * + * @param configuration a configuration to read the values from + * @param classLoader a class loader to use when loading classes + */ + @PublicEvolving + def configure(configuration: ReadableConfig, classLoader: ClassLoader): Unit = { + javaEnv.configure(configuration, classLoader) + } + + /** * Creates a DataSet of Strings produced by reading the given file line wise. * * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 49a8fe9..e16a4da 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -46,7 +46,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.OptimizerPlanEnvironment; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; @@ -104,7 +103,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -208,16 +206,16 @@ public class StreamExecutionEnvironment { this.configuration = checkNotNull(configuration); this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader; - // the parallelism of a job or an operator can only be specified at the following places: - // i) at the operator level using the SingleOutputStreamOperator.setParallelism(). - // ii) programmatically by using the env.setParallelism() method, or + // the configuration of a job or an operator can be specified at the following places: + // i) at the operator level using e.g. parallelism using the SingleOutputStreamOperator.setParallelism(). + // ii) programmatically by using e.g. the env.setRestartStrategy() method // iii) in the configuration passed here // // if specified in multiple places, the priority order is the above. // - // Given this, it is safe to overwrite the execution config default value here because all other ways assume + // Given this, it is safe to overwrite the execution config default values here because all other ways assume // that the env is already instantiated so they will overwrite the value passed here. - this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM)); + this.configure(this.configuration, this.userClassloader); } protected Configuration getConfiguration() { @@ -758,7 +756,7 @@ public class StreamExecutionEnvironment { configuration.getOptional(PipelineOptions.CACHED_FILES) .ifPresent(f -> { this.cacheFile.clear(); - parseCachedFiles(f).forEach(t -> registerCachedFile(t.f1, t.f0, t.f2)); + this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f)); }); config.configure(configuration, classLoader); checkpointCfg.configure(configuration); @@ -775,24 +773,6 @@ public class StreamExecutionEnvironment { } } - private List<Tuple3<String, String, Boolean>> parseCachedFiles(List<String> s) { - return s.stream() - .map(v -> Arrays.stream(v.split(",")) - .map(p -> p.split(":")) - .collect( - Collectors.toMap( - arr -> arr[0], // key name - arr -> arr[1] // value - ) - ) - ) - .map(m -> Tuple3.of( - m.get("name"), - m.get("path"), - Optional.ofNullable(m.get("executable")).map(Boolean::parseBoolean).orElse(false))) - .collect(Collectors.toList()); - } - // -------------------------------------------------------------------------------------------- // Data stream creations // -------------------------------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentConfigurationTest.java index 0885c2f..ac9ddac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentConfigurationTest.java @@ -64,7 +64,7 @@ public class StreamExecutionEnvironmentConfigurationTest { .whenSetFromFile("pipeline.operator-chaining", "false") .viaSetter((env, b) -> { if (b) { - throw new IllegalArgumentException("Cannot programatically enable operator chaining"); + throw new IllegalArgumentException("Cannot programmatically enable operator chaining"); } else { env.disableOperatorChaining(); }