Repository: spark
Updated Branches:
  refs/heads/master 1a5e46076 -> 51e2b38d9


[SPARK-24992][CORE] spark should randomize yarn local dir selection

**Description: 
[SPARK-24992](https://issues.apache.org/jira/browse/SPARK-24992)**
Utils.getLocalDir is used to get path of a temporary directory. However, it 
always returns the the same directory, which is the first element in the array 
localRootDirs. When running on YARN, this might causes the case that we always 
write to one disk, which makes it busy while other disks are free. We should 
randomize the selection to spread out the loads.

**What changes were proposed in this pull request?**
This PR randomized the selection of local directory inside the method 
Utils.getLocalDir. This change affects the Utils.fetchFile method since it 
based on the fact that Utils.getLocalDir always return the same directory to 
cache file. Therefore, a new variable cachedLocalDir is used to cache the first 
localDirectory that it gets from Utils.getLocalDir. Also, when getting the 
configured local directories (inside Utils. getConfiguredLocalDirs), in case we 
are in yarn mode, the array of directories are also randomized before return.

Author: Hieu Huynh <“hieu.hu...@oath.com”>

Closes #21953 from hthuynh2/SPARK_24992.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51e2b38d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51e2b38d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51e2b38d

Branch: refs/heads/master
Commit: 51e2b38d93df8cb0cc151d5e68a2190eab52644c
Parents: 1a5e460
Author: Hieu Huynh <“hieu.hu...@oath.com”>
Authored: Mon Aug 6 13:58:28 2018 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Mon Aug 6 13:58:28 2018 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 21 ++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51e2b38d/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a6fd363..7ec707d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -83,6 +83,7 @@ private[spark] object Utils extends Logging {
   val random = new Random()
 
   private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
+  @volatile private var cachedLocalDir: String = ""
 
   /**
    * Define a default value for driver memory here since this value is 
referenced across the code
@@ -462,7 +463,15 @@ private[spark] object Utils extends Logging {
     if (useCache && fetchCacheEnabled) {
       val cachedFileName = s"${url.hashCode}${timestamp}_cache"
       val lockFileName = s"${url.hashCode}${timestamp}_lock"
-      val localDir = new File(getLocalDir(conf))
+      // Set the cachedLocalDir for the first time and re-use it later
+      if (cachedLocalDir.isEmpty) {
+        this.synchronized {
+          if (cachedLocalDir.isEmpty) {
+            cachedLocalDir = getLocalDir(conf)
+          }
+        }
+      }
+      val localDir = new File(cachedLocalDir)
       val lockFile = new File(localDir, lockFileName)
       val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
       // Only one executor entry.
@@ -767,13 +776,17 @@ private[spark] object Utils extends Logging {
    *   - Otherwise, this will return java.io.tmpdir.
    *
    * Some of these configuration options might be lists of multiple paths, but 
this method will
-   * always return a single directory.
+   * always return a single directory. The return directory is chosen randomly 
from the array
+   * of directories it gets from getOrCreateLocalRootDirs.
    */
   def getLocalDir(conf: SparkConf): String = {
-    getOrCreateLocalRootDirs(conf).headOption.getOrElse {
+    val localRootDirs = getOrCreateLocalRootDirs(conf)
+    if (localRootDirs.isEmpty) {
       val configuredLocalDirs = getConfiguredLocalDirs(conf)
       throw new IOException(
         s"Failed to get a temp directory under 
[${configuredLocalDirs.mkString(",")}].")
+    } else {
+      localRootDirs(scala.util.Random.nextInt(localRootDirs.length))
     }
   }
 
@@ -815,7 +828,7 @@ private[spark] object Utils extends Logging {
       // to what Yarn on this system said was available. Note this assumes 
that Yarn has
       // created the directories already, and that they are secured so that 
only the
       // user has access to them.
-      getYarnLocalDirs(conf).split(",")
+      randomizeInPlace(getYarnLocalDirs(conf).split(","))
     } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
       conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
     } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to