[FLINK-9762] Consolidate configuration cloning in BootstrapTools
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/221a2b38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/221a2b38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/221a2b38 Branch: refs/heads/release-1.5 Commit: 221a2b3833634065c13492ce141a8ba674e630d6 Parents: b505e52 Author: Till Rohrmann <[email protected]> Authored: Wed Jul 18 15:45:15 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Jul 19 11:48:23 2018 +0200 ---------------------------------------------------------------------- .../_includes/generated/core_configuration.html | 5 --- .../apache/flink/configuration/CoreOptions.java | 7 ---- .../UnmodifiableConfiguration.java | 6 ++++ .../clusterframework/BootstrapTools.java | 34 ++++++++++++++++---- .../apache/flink/yarn/YarnResourceManager.java | 9 ++---- 5 files changed, 37 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/docs/_includes/generated/core_configuration.html ---------------------------------------------------------------------- diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html index 8281adc..98cca91 100644 --- a/docs/_includes/generated/core_configuration.html +++ b/docs/_includes/generated/core_configuration.html @@ -23,11 +23,6 @@ <td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td> </tr> <tr> - <td><h5>internal.io.tmp.dirs.use-local-default</h5></td> - <td style="word-wrap: break-word;">true</td> - <td>key, which says if default value is used for `io.tmp.dirs` config variable.</td> - </tr> - <tr> <td><h5>io.tmp.dirs</h5></td> <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td> <td></td> http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 911f399..eb27b25 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -187,13 +187,6 @@ public class CoreOptions { .defaultValue(System.getProperty("java.io.tmpdir")) .withDeprecatedKeys("taskmanager.tmp.dirs"); - /** - * String key, which says if default value is used for `io.tmp.dirs` config variable. - */ - public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default") - .defaultValue(true) - .withDescription("key, which says if default value is used for `io.tmp.dirs` config variable."); - // ------------------------------------------------------------------------ // program // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java index f92de1c..0a1bcc4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java @@ -65,6 +65,12 @@ public class UnmodifiableConfiguration extends Configuration { error(); } + @Override + public <T> boolean removeConfig(ConfigOption<T> configOption) { + error(); + return false; + } + private void error(){ throw new UnsupportedOperationException("The configuration is unmodifiable; its contents cannot be changed."); } http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 3b606f7..ebe7140 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; @@ -60,11 +61,19 @@ import scala.Some; import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; +import static org.apache.flink.configuration.ConfigOptions.key; + /** * Tools for starting JobManager and TaskManager processes, including the * Actor Systems used to run the JobManager and TaskManager actors. */ public class BootstrapTools { + /** + * Internal option which says if default value is used for {@link CoreOptions#TMP_DIRS}. + */ + private static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmpdirs.use-local-default") + .defaultValue(false); + private static final Logger LOG = LoggerFactory.getLogger(BootstrapTools.class); /** @@ -235,7 +244,7 @@ public class BootstrapTools { int numSlots, FiniteDuration registrationTimeout) { - Configuration cfg = baseConfig.clone(); + Configuration cfg = cloneConfiguration(baseConfig); if (jobManagerHostname != null && !jobManagerHostname.isEmpty()) { cfg.setString(JobManagerOptions.ADDRESS, jobManagerHostname); @@ -250,10 +259,6 @@ public class BootstrapTools { cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots); } - if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){ - cfg.removeConfig(CoreOptions.TMP_DIRS); - } - return cfg; } @@ -482,11 +487,28 @@ public class BootstrapTools { if (configuration.contains(CoreOptions.TMP_DIRS)) { LOG.info("Overriding Fink's temporary file directories with those " + "specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS)); - configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false); } else { LOG.info("Setting directories for temporary files to: {}", defaultDirs); configuration.setString(CoreOptions.TMP_DIRS, defaultDirs); + configuration.setBoolean(USE_LOCAL_DEFAULT_TMP_DIRS, true); + } + } + + /** + * Clones the given configuration and resets instance specific config options. + * + * @param configuration to clone + * @return Cloned configuration with reset instance specific config options + */ + public static Configuration cloneConfiguration(Configuration configuration) { + final Configuration clonedConfiguration = new Configuration(configuration); + + if (clonedConfiguration.getBoolean(USE_LOCAL_DEFAULT_TMP_DIRS)){ + clonedConfiguration.removeConfig(CoreOptions.TMP_DIRS); + clonedConfiguration.removeConfig(USE_LOCAL_DEFAULT_TMP_DIRS); } + + return clonedConfiguration; } } http://git-wip-us.apache.org/repos/asf/flink/blob/221a2b38/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 6bea822..729cdef 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -20,9 +20,9 @@ package org.apache.flink.yarn; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -471,15 +471,12 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme taskManagerParameters.taskManagerHeapSizeMB(), taskManagerParameters.taskManagerDirectMemoryLimitMB()); - Configuration taskManagerConfig = flinkConfig.clone(); - if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){ - taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS); - } + Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig); log.debug("TaskManager configuration: {}", taskManagerConfig); ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( - taskManagerConfig, + flinkConfig, yarnConfig, env, taskManagerParameters,
