Repository: flink
Updated Branches:
  refs/heads/master 40656c5df -> bd70a0001


[FLINK-7269] Refactor passing of dynamic properties

This closes #4415.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd70a000
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd70a000
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd70a000

Branch: refs/heads/master
Commit: bd70a00019f2c9fc01653d0229308635529aad73
Parents: 40656c5
Author: zjureel <zjur...@gmail.com>
Authored: Fri Jul 28 14:53:39 2017 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sat Aug 19 18:42:14 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |  2 +-
 .../configuration/GlobalConfiguration.java      | 60 ++++++++++++--------
 .../mesos/entrypoint/MesosEntrypointUtils.java  | 22 -------
 .../entrypoint/MesosJobClusterEntrypoint.java   | 14 +++--
 .../MesosSessionClusterEntrypoint.java          | 14 +++--
 .../entrypoint/MesosTaskExecutorRunner.java     |  6 +-
 .../MesosApplicationMasterRunner.java           |  3 +-
 .../MesosTaskManagerRunner.java                 |  3 +-
 8 files changed, 64 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index d6f1dec..dfcd04f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -79,7 +79,7 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        }
        
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * Returns the class associated with the given key as a string.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index ea9f8bf..4569ebe 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -28,6 +28,8 @@ import org.apache.flink.annotation.Internal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /**
  * Global configuration object for Flink. Similar to Java properties 
configuration
  * objects it includes key-value pairs which represent the framework's 
configuration.
@@ -46,24 +48,6 @@ public final class GlobalConfiguration {
 
        // 
--------------------------------------------------------------------------------------------
 
-       private static Configuration dynamicProperties = null;
-
-       /**
-        * Set the process-wide dynamic properties to be merged with the loaded 
configuration.
-     */
-       public static void setDynamicProperties(Configuration 
dynamicProperties) {
-               GlobalConfiguration.dynamicProperties = new 
Configuration(dynamicProperties);
-       }
-
-       /**
-        * Get the dynamic properties.
-     */
-       public static Configuration getDynamicProperties() {
-               return GlobalConfiguration.dynamicProperties;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
        /**
         * Loads the global configuration from the environment. Fails if an 
error occurs during loading. Returns an
         * empty configuration object if the environment variable is not set. 
In production this variable is set but
@@ -76,18 +60,30 @@ public final class GlobalConfiguration {
                if (configDir == null) {
                        return new Configuration();
                }
-               return loadConfiguration(configDir);
+               return loadConfiguration(configDir, null);
        }
 
        /**
         * Loads the configuration files from the specified directory.
         * <p>
         * YAML files are supported as configuration files.
-        * 
+        *
         * @param configDir
         *        the directory which contains the configuration files
         */
        public static Configuration loadConfiguration(final String configDir) {
+               return loadConfiguration(configDir, null);
+       }
+
+       /**
+        * Loads the configuration files from the specified directory. If the 
dynamic properties
+        * configuration is not null, then it is added to the loaded 
configuration.
+        *
+        * @param configDir directory to load the configuration from
+        * @param dynamicProperties configuration file containing the dynamic 
properties. Null if none.
+        * @return The configuration loaded from the given configuration 
directory
+        */
+       public static Configuration loadConfiguration(final String configDir, 
@Nullable final Configuration dynamicProperties) {
 
                if (configDir == null) {
                        throw new IllegalArgumentException("Given configuration 
directory is null, cannot load configuration");
@@ -109,13 +105,29 @@ public final class GlobalConfiguration {
                                        "' (" + confDirFile.getAbsolutePath() + 
") does not exist.");
                }
 
-               Configuration conf = loadYAMLResource(yamlConfigFile);
+               Configuration configuration = loadYAMLResource(yamlConfigFile);
 
-               if(dynamicProperties != null) {
-                       conf.addAll(dynamicProperties);
+               if (dynamicProperties != null) {
+                       configuration.addAll(dynamicProperties);
+               }
+
+               return configuration;
+       }
+
+       /**
+        * Loads the global configuration and adds the given dynamic properties
+        * configuration.
+        *
+        * @param dynamicProperties The given dynamic properties
+        * @return Returns the loaded global configuration with dynamic 
properties
+        */
+       public static Configuration 
loadConfigurationWithDynamicProperties(Configuration dynamicProperties) {
+               final String configDir = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+               if (configDir == null) {
+                       return new Configuration(dynamicProperties);
                }
 
-               return conf;
+               return loadConfiguration(configDir, dynamicProperties);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 0d81ead..368d62d 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,12 +19,10 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
 import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
@@ -34,7 +32,6 @@ import 
org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
 
-import org.apache.commons.cli.CommandLine;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 
@@ -50,25 +47,6 @@ import scala.concurrent.duration.FiniteDuration;
 public class MesosEntrypointUtils {
 
        /**
-        * Loads the global configuration and adds the dynamic properties 
parsed from
-        * the given command line.
-        *
-        * @param cmd command line to parse for dynamic properties
-        * @return Global configuration with dynamic properties set
-        * @deprecated replace once FLINK-7269 has been merged
-        */
-       @Deprecated
-       public static Configuration loadConfiguration(CommandLine cmd) {
-
-               // merge the dynamic properties from the command-line
-               Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
-               GlobalConfiguration.setDynamicProperties(dynamicProperties);
-               Configuration config = GlobalConfiguration.loadConfiguration();
-
-               return config;
-       }
-
-       /**
         * Loads and validates the Mesos scheduler configuration.
         * @param flinkConfig the global configuration.
         * @param hostname the hostname to advertise to the Mesos master.

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 890c4a7..ba3b51d 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -74,6 +75,8 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
                                
.addOption(BootstrapTools.newDynamicPropertiesOption());
        }
 
+       private final Configuration dynamicProperties;
+
        private MesosConfiguration schedulerConfiguration;
 
        private MesosServices mesosServices;
@@ -82,8 +85,10 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
 
        private ContainerSpecification taskManagerContainerSpec;
 
-       public MesosJobClusterEntrypoint(Configuration config) {
+       public MesosJobClusterEntrypoint(Configuration config, Configuration 
dynamicProperties) {
                super(config);
+
+               this.dynamicProperties = 
Preconditions.checkNotNull(dynamicProperties);
        }
 
        @Override
@@ -100,7 +105,7 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
 
                // TM configuration
                taskManagerParameters = 
MesosEntrypointUtils.createTmParameters(config, LOG);
-               taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, 
GlobalConfiguration.getDynamicProperties());
+               taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
        }
 
        @Override
@@ -195,9 +200,10 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
                        return;
                }
 
-               Configuration configuration = 
MesosEntrypointUtils.loadConfiguration(cmd);
+               Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+               Configuration configuration = 
GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
 
-               MesosJobClusterEntrypoint clusterEntrypoint = new 
MesosJobClusterEntrypoint(configuration);
+               MesosJobClusterEntrypoint clusterEntrypoint = new 
MesosJobClusterEntrypoint(configuration, dynamicProperties);
 
                clusterEntrypoint.startCluster();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 67f5899..0ee2680 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesCo
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -64,6 +65,8 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                                
.addOption(BootstrapTools.newDynamicPropertiesOption());
        }
 
+       private final Configuration dynamicProperties;
+
        private MesosConfiguration mesosConfig;
 
        private MesosServices mesosServices;
@@ -72,8 +75,10 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
 
        private ContainerSpecification taskManagerContainerSpec;
 
-       public MesosSessionClusterEntrypoint(Configuration config) {
+       public MesosSessionClusterEntrypoint(Configuration config, 
Configuration dynamicProperties) {
                super(config);
+
+               this.dynamicProperties = 
Preconditions.checkNotNull(dynamicProperties);
        }
 
        @Override
@@ -90,7 +95,7 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
 
                // TM configuration
                taskManagerParameters = 
MesosEntrypointUtils.createTmParameters(config, LOG);
-               taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, 
GlobalConfiguration.getDynamicProperties());
+               taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
        }
 
        @Override
@@ -169,9 +174,10 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        return;
                }
 
-               Configuration configuration = 
MesosEntrypointUtils.loadConfiguration(cmd);
+               Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+               Configuration configuration = 
GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
 
-               MesosSessionClusterEntrypoint clusterEntrypoint = new 
MesosSessionClusterEntrypoint(configuration);
+               MesosSessionClusterEntrypoint clusterEntrypoint = new 
MesosSessionClusterEntrypoint(configuration, dynamicProperties);
 
                clusterEntrypoint.startCluster();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index c4343d2..897e26d 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.mesos.entrypoint;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -71,7 +72,10 @@ public class MesosTaskExecutorRunner {
 
                final Configuration configuration;
                try {
-                       configuration = 
MesosEntrypointUtils.loadConfiguration(cmd);
+                       Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+                       LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+
+                       configuration = 
GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
                }
                catch (Throwable t) {
                        LOG.error("Failed to load the TaskManager configuration 
and dynamic properties.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 3d16a66..c0a6855 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -152,8 +152,7 @@ public class MesosApplicationMasterRunner {
                        CommandLine cmd = parser.parse(ALL_OPTIONS, args);
 
                        final Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
-                       
GlobalConfiguration.setDynamicProperties(dynamicProperties);
-                       final Configuration config = 
GlobalConfiguration.loadConfiguration();
+                       final Configuration config = 
GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
 
                        // configure the default filesystem
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index e1b0efa..4236a43 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -73,10 +73,9 @@ public class MesosTaskManagerRunner {
                final Configuration configuration;
                try {
                        final Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
-                       
GlobalConfiguration.setDynamicProperties(dynamicProperties);
                        LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
 
-                       configuration = GlobalConfiguration.loadConfiguration();
+                       configuration = 
GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
                }
                catch (Throwable t) {
                        LOG.error("Failed to load the TaskManager configuration 
and dynamic properties.", t);

Reply via email to