Repository: flink
Updated Branches:
  refs/heads/release-1.5 5fe5545fd -> 248bb2c44


[FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos

This closes #6284.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b505e52e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b505e52e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b505e52e

Branch: refs/heads/release-1.5
Commit: b505e52e3e4e3824b0f75c977b090c6d6cd24c62
Parents: 5fe5545
Author: Oleksandr Nitavskyi <[email protected]>
Authored: Mon Jul 2 09:42:17 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jul 19 11:48:00 2018 +0200

----------------------------------------------------------------------
 .../_includes/generated/core_configuration.html |  7 ++++-
 .../flink/configuration/Configuration.java      | 26 ++++++++++++++++++
 .../apache/flink/configuration/CoreOptions.java |  9 +++++-
 .../configuration/DelegatingConfiguration.java  |  5 ++++
 .../flink/configuration/ConfigurationTest.java  | 29 ++++++++++++++++++++
 .../mesos/entrypoint/MesosEntrypointUtils.java  | 12 ++------
 .../clusterframework/BootstrapTools.java        | 24 +++++++++++++++-
 .../clusterframework/BootstrapToolsTest.java    | 19 +++++++++++++
 .../flink/yarn/YarnApplicationMasterRunner.java | 13 ++-------
 .../apache/flink/yarn/YarnResourceManager.java  | 12 ++++++--
 .../flink/yarn/YarnTaskExecutorRunner.java      | 12 ++------
 .../yarn/YarnTaskManagerRunnerFactory.java      | 12 ++------
 .../yarn/entrypoint/YarnEntrypointUtils.java    | 13 ++-------
 13 files changed, 135 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html 
b/docs/_includes/generated/core_configuration.html
index 91fa1a5..8281adc 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -23,8 +23,13 @@
             <td>Defines the class resolution strategy when loading classes 
from user code, meaning whether to first check the user code jar 
("child-first") or the application classpath ("parent-first"). The default 
settings indicate to load classes first from the user code jar, which means 
that user code jars can include and load different dependencies than Flink uses 
(transitively).</td>
         </tr>
         <tr>
+            <td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>key, which says if default value is used for `io.tmp.dirs` 
config variable.</td>
+        </tr>
+        <tr>
             <td><h5>io.tmp.dirs</h5></td>
-            <td style="word-wrap: 
break-word;">System.getProperty("java.io.tmpdir")</td>
+            <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. 
'_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in 
standalone.</td>
             <td></td>
         </tr>
         <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 7d99fbb..00c4c38 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -729,6 +729,32 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                }
        }
 
+       /**
+        * Removes given config option from the configuration.
+        *
+        * @param configOption config option to remove
+        * @param <T> Type of the config option
+        * @return true is config has been removed, false otherwise
+        */
+       public <T> boolean removeConfig(ConfigOption<T> configOption){
+               synchronized (this.confData){
+                       // try the current key
+                       Object oldValue = 
this.confData.remove(configOption.key());
+                       if (oldValue == null){
+                               for (String deprecatedKey : 
configOption.deprecatedKeys()){
+                                       oldValue = 
this.confData.remove(deprecatedKey);
+                                       if (oldValue != null){
+                                               LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
+                                                       deprecatedKey, 
configOption.key());
+                                               return true;
+                                       }
+                               }
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
 
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 42c1c9c..911f399 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -181,12 +181,19 @@ public class CoreOptions {
         * The config parameter defining the directories for temporary files, 
separated by
         * ",", "|", or the system's {@link java.io.File#pathSeparator}.
         */
-       @Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
+       @Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' 
on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.")
        public static final ConfigOption<String> TMP_DIRS =
                key("io.tmp.dirs")
                        .defaultValue(System.getProperty("java.io.tmpdir"))
                        .withDeprecatedKeys("taskmanager.tmp.dirs");
 
+       /**
+        * String key, which says if default value is used for `io.tmp.dirs` 
config variable.
+        */
+       public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = 
key("internal.io.tmp.dirs.use-local-default")
+               .defaultValue(true)
+               .withDescription("key, which says if default value is used for 
`io.tmp.dirs` config variable.");
+
        // 
------------------------------------------------------------------------
        //  program
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 7b75c7a..1a637f6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -311,6 +311,11 @@ public final class DelegatingConfiguration extends 
Configuration {
        }
 
        @Override
+       public <T> boolean removeConfig(ConfigOption<T> configOption){
+               return backingConfig.removeConfig(configOption);
+       }
+
+       @Override
        public boolean containsKey(String key) {
                return backingConfig.containsKey(prefix + key);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 232c829..3b98a44 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -303,4 +304,32 @@ public class ConfigurationTest extends TestLogger {
                assertEquals(13, cfg.getInteger(matchesThird));
                assertEquals(-1, cfg.getInteger(notContained));
        }
+
+       @Test
+       public void testRemove(){
+               Configuration cfg = new Configuration();
+               cfg.setInteger("a", 1);
+               cfg.setInteger("b", 2);
+
+               ConfigOption<Integer> validOption = ConfigOptions
+                       .key("a")
+                       .defaultValue(-1);
+
+               ConfigOption<Integer> deprecatedOption = ConfigOptions
+                       .key("c")
+                       .defaultValue(-1)
+                       .withDeprecatedKeys("d", "b");
+
+               ConfigOption<Integer> unexistedOption = ConfigOptions
+                       .key("e")
+                       .defaultValue(-1)
+                       .withDeprecatedKeys("f", "g", "j");
+
+               assertEquals("Wrong expectation about size", 
cfg.keySet().size(), 2);
+               assertTrue("Expected 'validOption' is removed", 
cfg.removeConfig(validOption));
+               assertEquals("Wrong expectation about size", 
cfg.keySet().size(), 1);
+               assertTrue("Expected 'existedOption' is removed", 
cfg.removeConfig(deprecatedOption));
+               assertEquals("Wrong expectation about size", 
cfg.keySet().size(), 0);
+               assertFalse("Expected 'unexistedOption' is not removed", 
cfg.removeConfig(unexistedOption));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 498f435..2059c8e 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,13 +19,13 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
 import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
@@ -173,15 +173,7 @@ public class MesosEntrypointUtils {
                final Map<String, String> envs = System.getenv();
                final String tmpDirs = 
envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
 
-               // configure local directory
-               if (configuration.contains(CoreOptions.TMP_DIRS)) {
-                       log.info("Overriding Mesos' temporary file directories 
with those " +
-                               "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-               }
-               else if (tmpDirs != null) {
-                       log.info("Setting directories for temporary files to: 
{}", tmpDirs);
-                       configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
-               }
+               
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, tmpDirs);
 
                return configuration;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 7a8403a..3b606f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -250,7 +250,11 @@ public class BootstrapTools {
                        cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlots);
                }
 
-               return cfg; 
+               if 
(baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+                       cfg.removeConfig(CoreOptions.TMP_DIRS);
+               }
+
+               return cfg;
        }
 
        /**
@@ -467,4 +471,22 @@ public class BootstrapTools {
                }
                return template;
        }
+
+       /**
+        * Set temporary configuration directories if necessary
+        *
+        * @param configuration flink config to patch
+        * @param defaultDirs in case no tmp directories is set, next 
directories will be applied
+        */
+       public static void updateTmpDirectoriesInConfiguration(Configuration 
configuration, String defaultDirs){
+               if (configuration.contains(CoreOptions.TMP_DIRS)) {
+                       LOG.info("Overriding Fink's temporary file directories 
with those " +
+                               "specified in the Flink config: {}", 
configuration.getValue(CoreOptions.TMP_DIRS));
+                       
configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
+               }
+               else {
+                       LOG.info("Setting directories for temporary files to: 
{}", defaultDirs);
+                       configuration.setString(CoreOptions.TMP_DIRS, 
defaultDirs);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index cf38fea..7e31c8e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -276,4 +276,23 @@ public class BootstrapToolsTest {
                                        true, true, true, this.getClass()));
 
        }
+
+       @Test
+       public void testUpdateTmpDirectoriesInConfiguration(){
+               Configuration config = new Configuration();
+
+               // test that default value is taken
+               BootstrapTools.updateTmpDirectoriesInConfiguration(config, 
"default/directory/path");
+               assertEquals(config.getString(CoreOptions.TMP_DIRS), 
"default/directory/path");
+
+               // test that we ignore default value is value is set before
+               BootstrapTools.updateTmpDirectoriesInConfiguration(config, 
"not/default/directory/path");
+               assertEquals(config.getString(CoreOptions.TMP_DIRS), 
"default/directory/path");
+
+               //test that empty value is not a magic string
+               config.setString(CoreOptions.TMP_DIRS, "");
+               BootstrapTools.updateTmpDirectoriesInConfiguration(config, 
"some/new/path");
+               assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index eb977bf..497ac87 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -523,16 +522,8 @@ public class YarnApplicationMasterRunner {
                        ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
                        
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
-               // configure local directory
-               if (configuration.contains(CoreOptions.TMP_DIRS)) {
-                       log.info("Overriding YARN's temporary file directories 
with those " +
-                               "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-               }
-               else {
-                       final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-                       log.info("Setting directories for temporary files to: 
{}", localDirs);
-                       configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-               }
+               final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
+               
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
                return configuration;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 572e6ba..6bea822 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -470,14 +471,19 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                                taskManagerParameters.taskManagerHeapSizeMB(),
                                
taskManagerParameters.taskManagerDirectMemoryLimitMB());
 
-               log.debug("TaskManager configuration: {}", flinkConfig);
+               Configuration taskManagerConfig = flinkConfig.clone();
+               if 
(flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+                       taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
+               }
+
+               log.debug("TaskManager configuration: {}", taskManagerConfig);
 
                ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
-                       flinkConfig,
+                       taskManagerConfig,
                        yarnConfig,
                        env,
                        taskManagerParameters,
-                       flinkConfig,
+                       taskManagerConfig,
                        currDir,
                        YarnTaskExecutorRunner.class,
                        log);

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 0c676e7..4102158 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -21,10 +21,10 @@ package org.apache.flink.yarn;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -100,15 +100,7 @@ public class YarnTaskExecutorRunner {
                        final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
                        FileSystem.initialize(configuration);
 
-                       // configure local directory
-                       if (configuration.contains(CoreOptions.TMP_DIRS)) {
-                               LOG.info("Overriding YARN's temporary file 
directories with those " +
-                                       "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-                       }
-                       else {
-                               LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-                               configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-                       }
+                       
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
                        // tell akka to die in case of an error
                        
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
index d6f2364..9a01075 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
@@ -21,8 +21,8 @@ package org.apache.flink.yarn;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -121,15 +121,7 @@ public class YarnTaskManagerRunnerFactory {
                final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
                LOG.info("TM: remoteKeytabPrincipal obtained {}", 
remoteKeytabPrincipal);
 
-               // configure local directory
-               if (configuration.contains(CoreOptions.TMP_DIRS)) {
-                       LOG.info("Overriding YARN's temporary file directories 
with those " +
-                               "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-               }
-               else {
-                       LOG.info("Setting directories for temporary files to: 
{}", localDirs);
-                       configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-               }
+               
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
                // tell akka to die in case of an error
                configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, 
true);

http://git-wip-us.apache.org/repos/asf/flink/blob/b505e52e/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 49957e1..5566963 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -130,16 +129,8 @@ public class YarnEntrypointUtils {
                        
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPrincipal);
                }
 
-               // configure local directory
-               if (configuration.contains(CoreOptions.TMP_DIRS)) {
-                       log.info("Overriding YARN's temporary file directories 
with those " +
-                               "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-               }
-               else {
-                       final String localDirs = 
env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
-                       log.info("Setting directories for temporary files to: 
{}", localDirs);
-                       configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-               }
+               final String localDirs = 
env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
+               
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
                return configuration;
        }

Reply via email to