[FLINK-9762] Consolidate configuration cloning in BootstrapTools

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/221a2b38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/221a2b38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/221a2b38

Branch: refs/heads/release-1.5
Commit: 221a2b3833634065c13492ce141a8ba674e630d6
Parents: b505e52
Author: Till Rohrmann <[email protected]>
Authored: Wed Jul 18 15:45:15 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jul 19 11:48:23 2018 +0200

----------------------------------------------------------------------
 .../_includes/generated/core_configuration.html |  5 ---
 .../apache/flink/configuration/CoreOptions.java |  7 ----
 .../UnmodifiableConfiguration.java              |  6 ++++
 .../clusterframework/BootstrapTools.java        | 34 ++++++++++++++++----
 .../apache/flink/yarn/YarnResourceManager.java  |  9 ++----
 5 files changed, 37 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html 
b/docs/_includes/generated/core_configuration.html
index 8281adc..98cca91 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -23,11 +23,6 @@
             <td>Defines the class resolution strategy when loading classes 
from user code, meaning whether to first check the user code jar 
("child-first") or the application classpath ("parent-first"). The default 
settings indicate to load classes first from the user code jar, which means 
that user code jars can include and load different dependencies than Flink uses 
(transitively).</td>
         </tr>
         <tr>
-            <td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>key, which says if default value is used for `io.tmp.dirs` 
config variable.</td>
-        </tr>
-        <tr>
             <td><h5>io.tmp.dirs</h5></td>
             <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. 
'_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in 
standalone.</td>
             <td></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 911f399..eb27b25 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -187,13 +187,6 @@ public class CoreOptions {
                        .defaultValue(System.getProperty("java.io.tmpdir"))
                        .withDeprecatedKeys("taskmanager.tmp.dirs");
 
-       /**
-        * String key, which says if default value is used for `io.tmp.dirs` 
config variable.
-        */
-       public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = 
key("internal.io.tmp.dirs.use-local-default")
-               .defaultValue(true)
-               .withDescription("key, which says if default value is used for 
`io.tmp.dirs` config variable.");
-
        // 
------------------------------------------------------------------------
        //  program
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
index f92de1c..0a1bcc4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -65,6 +65,12 @@ public class UnmodifiableConfiguration extends Configuration 
{
                error();
        }
 
+       @Override
+       public <T> boolean removeConfig(ConfigOption<T> configOption) {
+               error();
+               return false;
+       }
+
        private void error(){
                throw new UnsupportedOperationException("The configuration is 
unmodifiable; its contents cannot be changed.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 3b606f7..ebe7140 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -60,11 +61,19 @@ import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /**
  * Tools for starting JobManager and TaskManager processes, including the
  * Actor Systems used to run the JobManager and TaskManager actors.
  */
 public class BootstrapTools {
+       /**
+        * Internal option which says if default value is used for {@link 
CoreOptions#TMP_DIRS}.
+        */
+       private static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = 
key("internal.io.tmpdirs.use-local-default")
+               .defaultValue(false);
+
        private static final Logger LOG = 
LoggerFactory.getLogger(BootstrapTools.class);
 
        /**
@@ -235,7 +244,7 @@ public class BootstrapTools {
                                int numSlots,
                                FiniteDuration registrationTimeout) {
 
-               Configuration cfg = baseConfig.clone();
+               Configuration cfg = cloneConfiguration(baseConfig);
 
                if (jobManagerHostname != null && 
!jobManagerHostname.isEmpty()) {
                        cfg.setString(JobManagerOptions.ADDRESS, 
jobManagerHostname);
@@ -250,10 +259,6 @@ public class BootstrapTools {
                        cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlots);
                }
 
-               if 
(baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
-                       cfg.removeConfig(CoreOptions.TMP_DIRS);
-               }
-
                return cfg;
        }
 
@@ -482,11 +487,28 @@ public class BootstrapTools {
                if (configuration.contains(CoreOptions.TMP_DIRS)) {
                        LOG.info("Overriding Fink's temporary file directories 
with those " +
                                "specified in the Flink config: {}", 
configuration.getValue(CoreOptions.TMP_DIRS));
-                       
configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
                }
                else {
                        LOG.info("Setting directories for temporary files to: 
{}", defaultDirs);
                        configuration.setString(CoreOptions.TMP_DIRS, 
defaultDirs);
+                       configuration.setBoolean(USE_LOCAL_DEFAULT_TMP_DIRS, 
true);
+               }
+       }
+
+       /**
+        * Clones the given configuration and resets instance specific config 
options.
+        *
+        * @param configuration to clone
+        * @return Cloned configuration with reset instance specific config 
options
+        */
+       public static Configuration cloneConfiguration(Configuration 
configuration) {
+               final Configuration clonedConfiguration = new 
Configuration(configuration);
+
+               if (clonedConfiguration.getBoolean(USE_LOCAL_DEFAULT_TMP_DIRS)){
+                       clonedConfiguration.removeConfig(CoreOptions.TMP_DIRS);
+                       
clonedConfiguration.removeConfig(USE_LOCAL_DEFAULT_TMP_DIRS);
                }
+
+               return clonedConfiguration;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6bea822..729cdef 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -20,9 +20,9 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -471,15 +471,12 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                                taskManagerParameters.taskManagerHeapSizeMB(),
                                
taskManagerParameters.taskManagerDirectMemoryLimitMB());
 
-               Configuration taskManagerConfig = flinkConfig.clone();
-               if 
(flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
-                       taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
-               }
+               Configuration taskManagerConfig = 
BootstrapTools.cloneConfiguration(flinkConfig);
 
                log.debug("TaskManager configuration: {}", taskManagerConfig);
 
                ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
-                       taskManagerConfig,
+                       flinkConfig,
                        yarnConfig,
                        env,
                        taskManagerParameters,

Reply via email to