Repository: flink Updated Branches: refs/heads/release-1.5 5fe5545fd -> 248bb2c44
[FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos This closes #6284. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b505e52e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b505e52e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b505e52e Branch: refs/heads/release-1.5 Commit: b505e52e3e4e3824b0f75c977b090c6d6cd24c62 Parents: 5fe5545 Author: Oleksandr Nitavskyi <[email protected]> Authored: Mon Jul 2 09:42:17 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Jul 19 11:48:00 2018 +0200 ---------------------------------------------------------------------- .../_includes/generated/core_configuration.html | 7 ++++- .../flink/configuration/Configuration.java | 26 ++++++++++++++++++ .../apache/flink/configuration/CoreOptions.java | 9 +++++- .../configuration/DelegatingConfiguration.java | 5 ++++ .../flink/configuration/ConfigurationTest.java | 29 ++++++++++++++++++++ .../mesos/entrypoint/MesosEntrypointUtils.java | 12 ++------ .../clusterframework/BootstrapTools.java | 24 +++++++++++++++- .../clusterframework/BootstrapToolsTest.java | 19 +++++++++++++ .../flink/yarn/YarnApplicationMasterRunner.java | 13 ++------- .../apache/flink/yarn/YarnResourceManager.java | 12 ++++++-- .../flink/yarn/YarnTaskExecutorRunner.java | 12 ++------ .../yarn/YarnTaskManagerRunnerFactory.java | 12 ++------ .../yarn/entrypoint/YarnEntrypointUtils.java | 13 ++------- 13 files changed, 135 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/docs/_includes/generated/core_configuration.html ---------------------------------------------------------------------- diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html index 91fa1a5..8281adc 100644 --- a/docs/_includes/generated/core_configuration.html +++ b/docs/_includes/generated/core_configuration.html @@ -23,8 +23,13 @@ <td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td> </tr> <tr> + <td><h5>internal.io.tmp.dirs.use-local-default</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>key, which says if default value is used for `io.tmp.dirs` config variable.</td> + </tr> + <tr> <td><h5>io.tmp.dirs</h5></td> - <td style="word-wrap: break-word;">System.getProperty("java.io.tmpdir")</td> + <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td> <td></td> </tr> <tr> http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index 7d99fbb..00c4c38 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -729,6 +729,32 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters } } + /** + * Removes given config option from the configuration. + * + * @param configOption config option to remove + * @param <T> Type of the config option + * @return true is config has been removed, false otherwise + */ + public <T> boolean removeConfig(ConfigOption<T> configOption){ + synchronized (this.confData){ + // try the current key + Object oldValue = this.confData.remove(configOption.key()); + if (oldValue == null){ + for (String deprecatedKey : configOption.deprecatedKeys()){ + oldValue = this.confData.remove(deprecatedKey); + if (oldValue != null){ + LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", + deprecatedKey, configOption.key()); + return true; + } + } + return false; + } + return true; + } + } + // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 42c1c9c..911f399 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -181,12 +181,19 @@ public class CoreOptions { * The config parameter defining the directories for temporary files, separated by * ",", "|", or the system's {@link java.io.File#pathSeparator}. */ - @Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")") + @Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.") public static final ConfigOption<String> TMP_DIRS = key("io.tmp.dirs") .defaultValue(System.getProperty("java.io.tmpdir")) .withDeprecatedKeys("taskmanager.tmp.dirs"); + /** + * String key, which says if default value is used for `io.tmp.dirs` config variable. + */ + public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default") + .defaultValue(true) + .withDescription("key, which says if default value is used for `io.tmp.dirs` config variable."); + // ------------------------------------------------------------------------ // program // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java index 7b75c7a..1a637f6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java @@ -311,6 +311,11 @@ public final class DelegatingConfiguration extends Configuration { } @Override + public <T> boolean removeConfig(ConfigOption<T> configOption){ + return backingConfig.removeConfig(configOption); + } + + @Override public boolean containsKey(String key) { return backingConfig.containsKey(prefix + key); } http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index 232c829..3b98a44 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -303,4 +304,32 @@ public class ConfigurationTest extends TestLogger { assertEquals(13, cfg.getInteger(matchesThird)); assertEquals(-1, cfg.getInteger(notContained)); } + + @Test + public void testRemove(){ + Configuration cfg = new Configuration(); + cfg.setInteger("a", 1); + cfg.setInteger("b", 2); + + ConfigOption<Integer> validOption = ConfigOptions + .key("a") + .defaultValue(-1); + + ConfigOption<Integer> deprecatedOption = ConfigOptions + .key("c") + .defaultValue(-1) + .withDeprecatedKeys("d", "b"); + + ConfigOption<Integer> unexistedOption = ConfigOptions + .key("e") + .defaultValue(-1) + .withDeprecatedKeys("f", "g", "j"); + + assertEquals("Wrong expectation about size", cfg.keySet().size(), 2); + assertTrue("Expected 'validOption' is removed", cfg.removeConfig(validOption)); + assertEquals("Wrong expectation about size", cfg.keySet().size(), 1); + assertTrue("Expected 'existedOption' is removed", cfg.removeConfig(deprecatedOption)); + assertEquals("Wrong expectation about size", cfg.keySet().size(), 0); + assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption)); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java index 498f435..2059c8e 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java @@ -19,13 +19,13 @@ package org.apache.flink.mesos.entrypoint; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.mesos.configuration.MesosOptions; import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys; import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; @@ -173,15 +173,7 @@ public class MesosEntrypointUtils { final Map<String, String> envs = System.getenv(); final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR); - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - log.info("Overriding Mesos' temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else if (tmpDirs != null) { - log.info("Setting directories for temporary files to: {}", tmpDirs); - configuration.setString(CoreOptions.TMP_DIRS, tmpDirs); - } + BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, tmpDirs); return configuration; } http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 7a8403a..3b606f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -250,7 +250,11 @@ public class BootstrapTools { cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots); } - return cfg; + if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){ + cfg.removeConfig(CoreOptions.TMP_DIRS); + } + + return cfg; } /** @@ -467,4 +471,22 @@ public class BootstrapTools { } return template; } + + /** + * Set temporary configuration directories if necessary + * + * @param configuration flink config to patch + * @param defaultDirs in case no tmp directories is set, next directories will be applied + */ + public static void updateTmpDirectoriesInConfiguration(Configuration configuration, String defaultDirs){ + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding Fink's temporary file directories with those " + + "specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS)); + configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false); + } + else { + LOG.info("Setting directories for temporary files to: {}", defaultDirs); + configuration.setString(CoreOptions.TMP_DIRS, defaultDirs); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java index cf38fea..7e31c8e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java @@ -276,4 +276,23 @@ public class BootstrapToolsTest { true, true, true, this.getClass())); } + + @Test + public void testUpdateTmpDirectoriesInConfiguration(){ + Configuration config = new Configuration(); + + // test that default value is taken + BootstrapTools.updateTmpDirectoriesInConfiguration(config, "default/directory/path"); + assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path"); + + // test that we ignore default value is value is set before + BootstrapTools.updateTmpDirectoriesInConfiguration(config, "not/default/directory/path"); + assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path"); + + //test that empty value is not a magic string + config.setString(CoreOptions.TMP_DIRS, ""); + BootstrapTools.updateTmpDirectoriesInConfiguration(config, "some/new/path"); + assertEquals(config.getString(CoreOptions.TMP_DIRS), ""); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index eb977bf..497ac87 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -21,7 +21,6 @@ package org.apache.flink.yarn; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; @@ -523,16 +522,8 @@ public class YarnApplicationMasterRunner { ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX, ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX); - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - log.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - log.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } + final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); + BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs); return configuration; } http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 572e6ba..6bea822 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -20,6 +20,7 @@ package org.apache.flink.yarn; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -470,14 +471,19 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme taskManagerParameters.taskManagerHeapSizeMB(), taskManagerParameters.taskManagerDirectMemoryLimitMB()); - log.debug("TaskManager configuration: {}", flinkConfig); + Configuration taskManagerConfig = flinkConfig.clone(); + if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){ + taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS); + } + + log.debug("TaskManager configuration: {}", taskManagerConfig); ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( - flinkConfig, + taskManagerConfig, yarnConfig, env, taskManagerParameters, - flinkConfig, + taskManagerConfig, currDir, YarnTaskExecutorRunner.class, log); http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index 0c676e7..4102158 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -21,10 +21,10 @@ package org.apache.flink.yarn; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; @@ -100,15 +100,7 @@ public class YarnTaskExecutorRunner { final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } + BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs); // tell akka to die in case of an error configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java index d6f2364..9a01075 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java @@ -21,8 +21,8 @@ package org.apache.flink.yarn; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; @@ -121,15 +121,7 @@ public class YarnTaskManagerRunnerFactory { final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal); - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } + BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs); // tell akka to die in case of an error configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java index 49957e1..5566963 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java @@ -20,7 +20,6 @@ package org.apache.flink.yarn.entrypoint; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; @@ -130,16 +129,8 @@ public class YarnEntrypointUtils { configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - log.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key()); - log.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } + final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key()); + BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs); return configuration; }
