This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 57253c5fc880fff880526a8954446c8189ac7c72 Author: Stephan Ewen <se...@apache.org> AuthorDate: Fri Aug 27 02:27:10 2021 +0200 [FLINK-24255][tests] Test environments respect configuration when being instantiated. This closes #17240 --- .../streaming/util/TestStreamEnvironment.java | 13 ++++++++--- .../MiniClusterPipelineExecutorServiceLoader.java | 25 +++++++++++++++++++--- .../apache/flink/test/util/TestEnvironment.java | 4 +++- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index b59f598..00f5692 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -48,19 +48,26 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { public TestStreamEnvironment( MiniCluster miniCluster, + Configuration config, int parallelism, Collection<Path> jarFiles, Collection<URL> classPaths) { super( new MiniClusterPipelineExecutorServiceLoader(miniCluster), - MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths), + MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster( + config, jarFiles, classPaths), null); setParallelism(parallelism); } public TestStreamEnvironment(MiniCluster miniCluster, int parallelism) { - this(miniCluster, parallelism, Collections.emptyList(), Collections.emptyList()); + this( + miniCluster, + new Configuration(), + parallelism, + Collections.emptyList(), + Collections.emptyList()); } /** @@ -83,7 +90,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { conf -> { TestStreamEnvironment env = new TestStreamEnvironment( - miniCluster, parallelism, jarFiles, classpaths); + miniCluster, conf, parallelism, jarFiles, classpaths); randomizeConfiguration(miniCluster, conf); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java index 9bc2e60..b181d51 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java @@ -20,6 +20,7 @@ package org.apache.flink.test.util; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.deployment.executors.PipelineExecutorUtils; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; @@ -36,6 +37,9 @@ import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterJobClient; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -48,6 +52,10 @@ import java.util.stream.Stream; * PipelineExecutors} that use a given {@link MiniCluster}. */ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecutorServiceLoader { + + private static final Logger LOG = + LoggerFactory.getLogger(MiniClusterPipelineExecutorServiceLoader.class); + public static final String NAME = "minicluster"; private final MiniCluster miniCluster; @@ -60,9 +68,14 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto * Populates a {@link Configuration} that is compatible with this {@link * MiniClusterPipelineExecutorServiceLoader}. */ - public static Configuration createConfiguration( - Collection<Path> jarFiles, Collection<URL> classPaths) { - Configuration config = new Configuration(); + public static Configuration updateConfigurationForMiniCluster( + Configuration config, Collection<Path> jarFiles, Collection<URL> classPaths) { + + checkOverridesOption(config, PipelineOptions.JARS); + checkOverridesOption(config, PipelineOptions.CLASSPATHS); + checkOverridesOption(config, DeploymentOptions.TARGET); + checkOverridesOption(config, DeploymentOptions.ATTACHED); + ConfigUtils.encodeCollectionToConfig( config, PipelineOptions.JARS, @@ -75,6 +88,12 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto return config; } + private static void checkOverridesOption(Configuration config, ConfigOption<?> option) { + if (config.contains(option)) { + LOG.warn("Overriding config setting '{}' for MiniCluster.", option.key()); + } + } + private static String getAbsoluteURL(Path path) { FileSystem fs; try { diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java index ac7c311..b63d9c4 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java @@ -21,6 +21,7 @@ package org.apache.flink.test.util; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.util.Preconditions; @@ -46,7 +47,8 @@ public class TestEnvironment extends ExecutionEnvironment { Collection<URL> classPaths) { super( new MiniClusterPipelineExecutorServiceLoader(miniCluster), - MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths), + MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster( + new Configuration(), jarFiles, classPaths), null); this.miniCluster = Preconditions.checkNotNull(miniCluster);