Repository: flink Updated Branches: refs/heads/master 40656c5df -> bd70a0001
[FLINK-7269] Refactor passing of dynamic properties This closes #4415. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd70a000 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd70a000 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd70a000 Branch: refs/heads/master Commit: bd70a00019f2c9fc01653d0229308635529aad73 Parents: 40656c5 Author: zjureel <zjur...@gmail.com> Authored: Fri Jul 28 14:53:39 2017 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sat Aug 19 18:42:14 2017 +0200 ---------------------------------------------------------------------- .../flink/configuration/Configuration.java | 2 +- .../configuration/GlobalConfiguration.java | 60 ++++++++++++-------- .../mesos/entrypoint/MesosEntrypointUtils.java | 22 ------- .../entrypoint/MesosJobClusterEntrypoint.java | 14 +++-- .../MesosSessionClusterEntrypoint.java | 14 +++-- .../entrypoint/MesosTaskExecutorRunner.java | 6 +- .../MesosApplicationMasterRunner.java | 3 +- .../MesosTaskManagerRunner.java | 3 +- 8 files changed, 64 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index d6f1dec..dfcd04f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -79,7 +79,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters } // -------------------------------------------------------------------------------------------- - + /** * Returns the class associated with the given key as a string. * http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index ea9f8bf..4569ebe 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -28,6 +28,8 @@ import org.apache.flink.annotation.Internal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + /** * Global configuration object for Flink. Similar to Java properties configuration * objects it includes key-value pairs which represent the framework's configuration. @@ -46,24 +48,6 @@ public final class GlobalConfiguration { // -------------------------------------------------------------------------------------------- - private static Configuration dynamicProperties = null; - - /** - * Set the process-wide dynamic properties to be merged with the loaded configuration. - */ - public static void setDynamicProperties(Configuration dynamicProperties) { - GlobalConfiguration.dynamicProperties = new Configuration(dynamicProperties); - } - - /** - * Get the dynamic properties. - */ - public static Configuration getDynamicProperties() { - return GlobalConfiguration.dynamicProperties; - } - - // -------------------------------------------------------------------------------------------- - /** * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an * empty configuration object if the environment variable is not set. In production this variable is set but @@ -76,18 +60,30 @@ public final class GlobalConfiguration { if (configDir == null) { return new Configuration(); } - return loadConfiguration(configDir); + return loadConfiguration(configDir, null); } /** * Loads the configuration files from the specified directory. * <p> * YAML files are supported as configuration files. - * + * * @param configDir * the directory which contains the configuration files */ public static Configuration loadConfiguration(final String configDir) { + return loadConfiguration(configDir, null); + } + + /** + * Loads the configuration files from the specified directory. If the dynamic properties + * configuration is not null, then it is added to the loaded configuration. + * + * @param configDir directory to load the configuration from + * @param dynamicProperties configuration file containing the dynamic properties. Null if none. + * @return The configuration loaded from the given configuration directory + */ + public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) { if (configDir == null) { throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration"); @@ -109,13 +105,29 @@ public final class GlobalConfiguration { "' (" + confDirFile.getAbsolutePath() + ") does not exist."); } - Configuration conf = loadYAMLResource(yamlConfigFile); + Configuration configuration = loadYAMLResource(yamlConfigFile); - if(dynamicProperties != null) { - conf.addAll(dynamicProperties); + if (dynamicProperties != null) { + configuration.addAll(dynamicProperties); + } + + return configuration; + } + + /** + * Loads the global configuration and adds the given dynamic properties + * configuration. + * + * @param dynamicProperties The given dynamic properties + * @return Returns the loaded global configuration with dynamic properties + */ + public static Configuration loadConfigurationWithDynamicProperties(Configuration dynamicProperties) { + final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); + if (configDir == null) { + return new Configuration(dynamicProperties); } - return conf; + return loadConfiguration(configDir, dynamicProperties); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java index 0d81ead..368d62d 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java @@ -19,12 +19,10 @@ package org.apache.flink.mesos.entrypoint; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.mesos.configuration.MesosOptions; import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; import org.apache.flink.mesos.util.MesosConfiguration; -import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; @@ -34,7 +32,6 @@ import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; -import org.apache.commons.cli.CommandLine; import org.apache.mesos.Protos; import org.slf4j.Logger; @@ -50,25 +47,6 @@ import scala.concurrent.duration.FiniteDuration; public class MesosEntrypointUtils { /** - * Loads the global configuration and adds the dynamic properties parsed from - * the given command line. - * - * @param cmd command line to parse for dynamic properties - * @return Global configuration with dynamic properties set - * @deprecated replace once FLINK-7269 has been merged - */ - @Deprecated - public static Configuration loadConfiguration(CommandLine cmd) { - - // merge the dynamic properties from the command-line - Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); - GlobalConfiguration.setDynamicProperties(dynamicProperties); - Configuration config = GlobalConfiguration.loadConfiguration(); - - return config; - } - - /** * Loads and validates the Mesos scheduler configuration. * @param flinkConfig the global configuration. * @param hostname the hostname to advertise to the Mesos master. http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java index 890c4a7..ba3b51d 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -74,6 +75,8 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { .addOption(BootstrapTools.newDynamicPropertiesOption()); } + private final Configuration dynamicProperties; + private MesosConfiguration schedulerConfiguration; private MesosServices mesosServices; @@ -82,8 +85,10 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { private ContainerSpecification taskManagerContainerSpec; - public MesosJobClusterEntrypoint(Configuration config) { + public MesosJobClusterEntrypoint(Configuration config, Configuration dynamicProperties) { super(config); + + this.dynamicProperties = Preconditions.checkNotNull(dynamicProperties); } @Override @@ -100,7 +105,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { // TM configuration taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG); - taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties()); + taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties); } @Override @@ -195,9 +200,10 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { return; } - Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd); + Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties); - MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration); + MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration, dynamicProperties); clusterEntrypoint.startCluster(); } http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java index 67f5899..0ee2680 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesCo import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -64,6 +65,8 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { .addOption(BootstrapTools.newDynamicPropertiesOption()); } + private final Configuration dynamicProperties; + private MesosConfiguration mesosConfig; private MesosServices mesosServices; @@ -72,8 +75,10 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { private ContainerSpecification taskManagerContainerSpec; - public MesosSessionClusterEntrypoint(Configuration config) { + public MesosSessionClusterEntrypoint(Configuration config, Configuration dynamicProperties) { super(config); + + this.dynamicProperties = Preconditions.checkNotNull(dynamicProperties); } @Override @@ -90,7 +95,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { // TM configuration taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG); - taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties()); + taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties); } @Override @@ -169,9 +174,10 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { return; } - Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd); + Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties); - MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration); + MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration, dynamicProperties); clusterEntrypoint.startCluster(); } http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java index c4343d2..897e26d 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java @@ -21,6 +21,7 @@ package org.apache.flink.mesos.entrypoint; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys; import org.apache.flink.runtime.clusterframework.BootstrapTools; @@ -71,7 +72,10 @@ public class MesosTaskExecutorRunner { final Configuration configuration; try { - configuration = MesosEntrypointUtils.loadConfiguration(cmd); + Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + + configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties); } catch (Throwable t) { LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t); http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 3d16a66..c0a6855 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -152,8 +152,7 @@ public class MesosApplicationMasterRunner { CommandLine cmd = parser.parse(ALL_OPTIONS, args); final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); - GlobalConfiguration.setDynamicProperties(dynamicProperties); - final Configuration config = GlobalConfiguration.loadConfiguration(); + final Configuration config = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties); // configure the default filesystem try { http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java index e1b0efa..4236a43 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java @@ -73,10 +73,9 @@ public class MesosTaskManagerRunner { final Configuration configuration; try { final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); - GlobalConfiguration.setDynamicProperties(dynamicProperties); LOG.debug("Mesos dynamic properties: {}", dynamicProperties); - configuration = GlobalConfiguration.loadConfiguration(); + configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties); } catch (Throwable t) { LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);