Repository: spark
Updated Branches:
  refs/heads/master 8f7308f9c -> 38112905b


[SPARK-5479] [YARN] Handle --py-files correctly in YARN.

The bug description is a little misleading: the actual issue is that
.py files are not handled correctly when distributed by YARN. They're
added to "spark.submit.pyFiles", which, when processed by context.py,
explicitly whitelists certain extensions (see PACKAGE_EXTENSIONS),
and that does not include .py files.

On top of that, archives were not handled at all! They made it to the
driver's python path, but never made it to executors, since the mechanism
used to propagate their location (spark.submit.pyFiles) only works on
the driver side.

So, instead, ignore "spark.submit.pyFiles" and just build PYTHONPATH
correctly for both driver and executors. Individual .py files are
placed in a subdirectory of the container's local dir in the cluster,
which is then added to the python path. Archives are added directly.

The change, as a side effect, ends up solving the symptom described
in the bug. The issue was not that the files were not being distributed,
but that they were never made visible to the python application
running under Spark.

Also included is a proper unit test for running python on YARN, which
broke in several different ways with the previous code.

A short walk around of the changes:
- SparkSubmit does not try to be smart about how YARN handles python
  files anymore. It just passes down the configs to the YARN client
  code.
- The YARN client distributes python files and archives differently,
  placing the files in a subdirectory.
- The YARN client now sets PYTHONPATH for the processes it launches;
  to properly handle different locations, it uses YARN's support for
  embedding env variables, so to avoid YARN expanding those at the
  wrong time, SparkConf is now propagated to the AM using a conf file
  instead of command line options.
- Because the Client initialization code is a maze of implicit
  dependencies, some code needed to be moved around to make sure
  all needed state was available when the code ran.
- The pyspark tests in YarnClusterSuite now actually distribute and try
  to use both a python file and an archive containing a different python
  module. Also added a yarn-client tests for completeness.
- I cleaned up some of the code around distributing files to YARN, to
  avoid adding more copied & pasted code to handle the new files being
  distributed.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #6360 from vanzin/SPARK-5479 and squashes the following commits:

bcaf7e6 [Marcelo Vanzin] Feedback.
c47501f [Marcelo Vanzin] Fix yarn-client mode.
46b1d0c [Marcelo Vanzin] Merge branch 'master' into SPARK-5479
c743778 [Marcelo Vanzin] Only pyspark cares about python archives.
c8e5a82 [Marcelo Vanzin] Actually run pyspark in client mode.
705571d [Marcelo Vanzin] Move some code to the YARN module.
1dd4d0c [Marcelo Vanzin] Review feedback.
71ee736 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479
220358b [Marcelo Vanzin] Scalastyle.
cdbb990 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479
7fe3cd4 [Marcelo Vanzin] No need to distribute primary file to executors.
09045f1 [Marcelo Vanzin] Style.
943cbf4 [Marcelo Vanzin] [SPARK-5479] [yarn] Handle --py-files correctly in 
YARN.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38112905
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38112905
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38112905

Branch: refs/heads/master
Commit: 38112905bc3b33f2ae75274afba1c30e116f6e46
Parents: 8f7308f
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Jun 10 13:17:29 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Jun 10 13:17:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   |  77 ++---
 .../spark/deploy/yarn/ApplicationMaster.scala   |  20 +-
 .../yarn/ApplicationMasterArguments.scala       |  12 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 295 ++++++++++++-------
 .../spark/deploy/yarn/ClientArguments.scala     |   4 +-
 .../cluster/YarnClientSchedulerBackend.scala    |   5 +-
 .../apache/spark/deploy/yarn/ClientSuite.scala  |   4 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala    |  61 ++--
 8 files changed, 270 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/38112905/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a0eae77..b8978e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -324,55 +324,20 @@ object SparkSubmit {
         // Usage: PythonAppRunner <main python file> <extra python files> [app 
arguments]
         args.mainClass = "org.apache.spark.deploy.PythonRunner"
         args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ 
args.childArgs
-        args.files = mergeFileLists(args.files, args.primaryResource)
+        if (clusterManager != YARN) {
+          // The YARN backend distributes the primary file differently, so 
don't merge it.
+          args.files = mergeFileLists(args.files, args.primaryResource)
+        }
+      }
+      if (clusterManager != YARN) {
+        // The YARN backend handles python files differently, so don't merge 
the lists.
+        args.files = mergeFileLists(args.files, args.pyFiles)
       }
-      args.files = mergeFileLists(args.files, args.pyFiles)
       if (args.pyFiles != null) {
         sysProps("spark.submit.pyFiles") = args.pyFiles
       }
     }
 
-    // In yarn mode for a python app, add pyspark archives to files
-    // that can be distributed with the job
-    if (args.isPython && clusterManager == YARN) {
-      var pyArchives: String = null
-      val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
-      if (pyArchivesEnvOpt.isDefined) {
-        pyArchives = pyArchivesEnvOpt.get
-      } else {
-        if (!sys.env.contains("SPARK_HOME")) {
-          printErrorAndExit("SPARK_HOME does not exist for python application 
in yarn mode.")
-        }
-        val pythonPath = new ArrayBuffer[String]
-        for (sparkHome <- sys.env.get("SPARK_HOME")) {
-          val pyLibPath = Seq(sparkHome, "python", 
"lib").mkString(File.separator)
-          val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
-          if (!pyArchivesFile.exists()) {
-            printErrorAndExit("pyspark.zip does not exist for python 
application in yarn mode.")
-          }
-          val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
-          if (!py4jFile.exists()) {
-            printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python 
application " +
-              "in yarn mode.")
-          }
-          pythonPath += pyArchivesFile.getAbsolutePath()
-          pythonPath += py4jFile.getAbsolutePath()
-        }
-        pyArchives = pythonPath.mkString(",")
-      }
-
-      pyArchives = pyArchives.split(",").map { localPath =>
-        val localURI = Utils.resolveURI(localPath)
-        if (localURI.getScheme != "local") {
-          args.files = mergeFileLists(args.files, localURI.toString)
-          new Path(localPath).getName
-        } else {
-          localURI.getPath
-        }
-      }.mkString(File.pathSeparator)
-      sysProps("spark.submit.pyArchives") = pyArchives
-    }
-
     // If we're running a R app, set the main class to our specific R runner
     if (args.isR && deployMode == CLIENT) {
       if (args.primaryResource == SPARKR_SHELL) {
@@ -386,19 +351,10 @@ object SparkSubmit {
       }
     }
 
-    if (isYarnCluster) {
-      // In yarn-cluster mode for a python app, add primary resource and 
pyFiles to files
-      // that can be distributed with the job
-      if (args.isPython) {
-        args.files = mergeFileLists(args.files, args.primaryResource)
-        args.files = mergeFileLists(args.files, args.pyFiles)
-      }
-
+    if (isYarnCluster && args.isR) {
       // In yarn-cluster mode for a R app, add primary resource to files
       // that can be distributed with the job
-      if (args.isR) {
-        args.files = mergeFileLists(args.files, args.primaryResource)
-      }
+      args.files = mergeFileLists(args.files, args.primaryResource)
     }
 
     // Special flag to avoid deprecation warnings at the client
@@ -515,17 +471,18 @@ object SparkSubmit {
       }
     }
 
+    // Let YARN know it's a pyspark app, so it distributes needed libraries.
+    if (clusterManager == YARN && args.isPython) {
+      sysProps.put("spark.yarn.isPython", "true")
+    }
+
     // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
     if (isYarnCluster) {
       childMainClass = "org.apache.spark.deploy.yarn.Client"
       if (args.isPython) {
-        val mainPyFile = new Path(args.primaryResource).getName
-        childArgs += ("--primary-py-file", mainPyFile)
+        childArgs += ("--primary-py-file", args.primaryResource)
         if (args.pyFiles != null) {
-          // These files will be distributed to each machine's working 
directory, so strip the
-          // path prefix
-          val pyFilesNames = args.pyFiles.split(",").map(p => (new 
Path(p)).getName).mkString(",")
-          childArgs += ("--py-files", pyFilesNames)
+          childArgs += ("--py-files", args.pyFiles)
         }
         childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
       } else if (args.isR) {

http://git-wip-us.apache.org/repos/asf/spark/blob/38112905/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 002d7b6..83dafa4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.spark.rpc._
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, 
SparkEnv}
 import org.apache.spark.SparkException
-import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
YarnSchedulerBackend}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -46,6 +46,14 @@ private[spark] class ApplicationMaster(
     client: YarnRMClient)
   extends Logging {
 
+  // Load the properties file with the Spark configuration and set entries as 
system properties,
+  // so that user code run inside the AM also has access to them.
+  if (args.propertiesFile != null) {
+    Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
+      sys.props(k) = v
+    }
+  }
+
   // TODO: Currently, task to container is computed once (TaskSetManager) - 
which need not be
   // optimal as more containers are available. Might need to handle this 
better.
 
@@ -490,9 +498,11 @@ private[spark] class ApplicationMaster(
         new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
       }
 
+    var userArgs = args.userArgs
     if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
-      System.setProperty("spark.submit.pyFiles",
-        PythonRunner.formatPaths(args.pyFiles).mkString(","))
+      // When running pyspark, the app is run using PythonRunner. The second 
argument is the list
+      // of files to add to PYTHONPATH, which Client.scala already handles, so 
it's empty.
+      userArgs = Seq(args.primaryPyFile, "") ++ userArgs
     }
     if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
       // TODO(davies): add R dependencies here
@@ -503,9 +513,7 @@ private[spark] class ApplicationMaster(
     val userThread = new Thread {
       override def run() {
         try {
-          val mainArgs = new Array[String](args.userArgs.size)
-          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
-          mainMethod.invoke(null, mainArgs)
+          mainMethod.invoke(null, userArgs.toArray)
           finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
           logDebug("Done running users class")
         } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/38112905/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index ae6dc10..68e9f6b 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,11 +26,11 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userClass: String = null
   var primaryPyFile: String = null
   var primaryRFile: String = null
-  var pyFiles: String = null
-  var userArgs: Seq[String] = Seq[String]()
+  var userArgs: Seq[String] = Nil
   var executorMemory = 1024
   var executorCores = 1
   var numExecutors = DEFAULT_NUMBER_EXECUTORS
+  var propertiesFile: String = null
 
   parseArgs(args.toList)
 
@@ -59,10 +59,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
           primaryRFile = value
           args = tail
 
-        case ("--py-files") :: value :: tail =>
-          pyFiles = value
-          args = tail
-
         case ("--args" | "--arg") :: value :: tail =>
           userArgsBuffer += value
           args = tail
@@ -79,6 +75,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
           executorCores = value
           args = tail
 
+        case ("--properties-file") :: value :: tail =>
+          propertiesFile = value
+          args = tail
+
         case _ =>
           printUsageAndExit(1, args)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/38112905/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f4d4321..ec9402a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, 
IOException}
+import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, 
IOException,
+  OutputStreamWriter}
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 import java.nio.ByteBuffer
 import java.security.PrivilegedExceptionAction
-import java.util.UUID
+import java.util.{Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConversions._
@@ -29,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet, ListBuffer, Map}
 import scala.reflect.runtime.universe
 import scala.util.{Try, Success, Failure}
 
+import com.google.common.base.Charsets.UTF_8
 import com.google.common.base.Objects
 import com.google.common.io.Files
 
@@ -247,7 +249,9 @@ private[spark] class Client(
    * This is used for setting up a container launch context for our 
ApplicationMaster.
    * Exposed for testing.
    */
-  def prepareLocalResources(appStagingDir: String): HashMap[String, 
LocalResource] = {
+  def prepareLocalResources(
+      appStagingDir: String,
+      pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
     logInfo("Preparing resources for our AM container")
     // Upload Spark and the application JAR to the remote file system if 
necessary,
     // and add them as local resources to the application master.
@@ -277,20 +281,6 @@ private[spark] class Client(
           "for alternatives.")
     }
 
-    // If we passed in a keytab, make sure we copy the keytab to the staging 
directory on
-    // HDFS, and setup the relevant environment vars, so the AM can login 
again.
-    if (loginFromKeytab) {
-      logInfo("To enable the AM to login from keytab, credentials are being 
copied over to the AM" +
-        " via the YARN Secure Distributed Cache.")
-      val localUri = new URI(args.keytab)
-      val localPath = getQualifiedLocalPath(localUri, hadoopConf)
-      val destinationPath = copyFileToRemote(dst, localPath, replication)
-      val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
-      distCacheMgr.addResource(
-        destFs, hadoopConf, destinationPath, localResources, 
LocalResourceType.FILE,
-        sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
-    }
-
     def addDistributedUri(uri: URI): Boolean = {
       val uriStr = uri.toString()
       if (distributedUris.contains(uriStr)) {
@@ -303,6 +293,57 @@ private[spark] class Client(
     }
 
     /**
+     * Distribute a file to the cluster.
+     *
+     * If the file's path is a "local:" URI, it's actually not distributed. 
Other files are copied
+     * to HDFS (if not already there) and added to the application's 
distributed cache.
+     *
+     * @param path URI of the file to distribute.
+     * @param resType Type of resource being distributed.
+     * @param destName Name of the file in the distributed cache.
+     * @param targetDir Subdirectory where to place the file.
+     * @param appMasterOnly Whether to distribute only to the AM.
+     * @return A 2-tuple. First item is whether the file is a "local:" URI. 
Second item is the
+     *         localized path for non-local paths, or the input `path` for 
local paths.
+     *         The localized path will be null if the URI has already been 
added to the cache.
+     */
+    def distribute(
+        path: String,
+        resType: LocalResourceType = LocalResourceType.FILE,
+        destName: Option[String] = None,
+        targetDir: Option[String] = None,
+        appMasterOnly: Boolean = false): (Boolean, String) = {
+      val localURI = new URI(path.trim())
+      if (localURI.getScheme != LOCAL_SCHEME) {
+        if (addDistributedUri(localURI)) {
+          val localPath = getQualifiedLocalPath(localURI, hadoopConf)
+          val linkname = targetDir.map(_ + "/").getOrElse("") +
+            
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
+          val destPath = copyFileToRemote(dst, localPath, replication)
+          distCacheMgr.addResource(
+            fs, hadoopConf, destPath, localResources, resType, linkname, 
statCache,
+            appMasterOnly = appMasterOnly)
+          (false, linkname)
+        } else {
+          (false, null)
+        }
+      } else {
+        (true, path.trim())
+      }
+    }
+
+    // If we passed in a keytab, make sure we copy the keytab to the staging 
directory on
+    // HDFS, and setup the relevant environment vars, so the AM can login 
again.
+    if (loginFromKeytab) {
+      logInfo("To enable the AM to login from keytab, credentials are being 
copied over to the AM" +
+        " via the YARN Secure Distributed Cache.")
+      val (_, localizedPath) = distribute(args.keytab,
+        destName = Some(sparkConf.get("spark.yarn.keytab")),
+        appMasterOnly = true)
+      require(localizedPath != null, "Keytab file already distributed.")
+    }
+
+    /**
      * Copy the given main resource to the distributed cache if the scheme is 
not "local".
      * Otherwise, set the corresponding key in our SparkConf to handle it 
downstream.
      * Each resource is represented by a 3-tuple of:
@@ -314,33 +355,18 @@ private[spark] class Client(
       (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
       (APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
       ("log4j.properties", oldLog4jConf.orNull, null)
-    ).foreach { case (destName, _localPath, confKey) =>
-      val localPath: String = if (_localPath != null) _localPath.trim() else ""
-      if (!localPath.isEmpty()) {
-        val localURI = new URI(localPath)
-        if (localURI.getScheme != LOCAL_SCHEME) {
-          if (addDistributedUri(localURI)) {
-            val src = getQualifiedLocalPath(localURI, hadoopConf)
-            val destPath = copyFileToRemote(dst, src, replication)
-            val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
-            distCacheMgr.addResource(destFs, hadoopConf, destPath,
-              localResources, LocalResourceType.FILE, destName, statCache)
-          }
-        } else if (confKey != null) {
+    ).foreach { case (destName, path, confKey) =>
+      if (path != null && !path.trim().isEmpty()) {
+        val (isLocal, localizedPath) = distribute(path, destName = 
Some(destName))
+        if (isLocal && confKey != null) {
+          require(localizedPath != null, s"Path $path already distributed.")
           // If the resource is intended for local use only, handle this 
downstream
           // by setting the appropriate property
-          sparkConf.set(confKey, localPath)
+          sparkConf.set(confKey, localizedPath)
         }
       }
     }
 
-    createConfArchive().foreach { file =>
-      require(addDistributedUri(file.toURI()))
-      val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
-      distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, 
LocalResourceType.ARCHIVE,
-        LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
-    }
-
     /**
      * Do the same for any additional resources passed in through 
ClientArguments.
      * Each resource category is represented by a 3-tuple of:
@@ -356,21 +382,10 @@ private[spark] class Client(
     ).foreach { case (flist, resType, addToClasspath) =>
       if (flist != null && !flist.isEmpty()) {
         flist.split(',').foreach { file =>
-          val localURI = new URI(file.trim())
-          if (localURI.getScheme != LOCAL_SCHEME) {
-            if (addDistributedUri(localURI)) {
-              val localPath = new Path(localURI)
-              val linkname = 
Option(localURI.getFragment()).getOrElse(localPath.getName())
-              val destPath = copyFileToRemote(dst, localPath, replication)
-              distCacheMgr.addResource(
-                fs, hadoopConf, destPath, localResources, resType, linkname, 
statCache)
-              if (addToClasspath) {
-                cachedSecondaryJarLinks += linkname
-              }
-            }
-          } else if (addToClasspath) {
-            // Resource is intended for local use only and should be added to 
the class path
-            cachedSecondaryJarLinks += file.trim()
+          val (_, localizedPath) = distribute(file, resType = resType)
+          require(localizedPath != null)
+          if (addToClasspath) {
+            cachedSecondaryJarLinks += localizedPath
           }
         }
       }
@@ -379,11 +394,31 @@ private[spark] class Client(
       sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, 
cachedSecondaryJarLinks.mkString(","))
     }
 
+    if (isClusterMode && args.primaryPyFile != null) {
+      distribute(args.primaryPyFile, appMasterOnly = true)
+    }
+
+    pySparkArchives.foreach { f => distribute(f) }
+
+    // The python files list needs to be treated especially. All files that 
are not an
+    // archive need to be placed in a subdirectory that will be added to 
PYTHONPATH.
+    args.pyFiles.foreach { f =>
+      val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else 
None
+      distribute(f, targetDir = targetDir)
+    }
+
+    // Distribute an archive with Hadoop and Spark configuration for the AM.
+    val (_, confLocalizedPath) = 
distribute(createConfArchive().getAbsolutePath(),
+      resType = LocalResourceType.ARCHIVE,
+      destName = Some(LOCALIZED_CONF_DIR),
+      appMasterOnly = true)
+    require(confLocalizedPath != null)
+
     localResources
   }
 
   /**
-   * Create an archive with the Hadoop config files for distribution.
+   * Create an archive with the config files for distribution.
    *
    * These are only used by the AM, since executors will use the configuration 
object broadcast by
    * the driver. The files are zipped and added to the job as an archive, so 
that YARN will explode
@@ -395,8 +430,11 @@ private[spark] class Client(
    *
    * Currently this makes a shallow copy of the conf directory. If there are 
cases where a
    * Hadoop config directory contains subdirectories, this code will have to 
be fixed.
+   *
+   * The archive also contains some Spark configuration. Namely, it saves the 
contents of
+   * SparkConf in a file to be loaded by the AM process.
    */
-  private def createConfArchive(): Option[File] = {
+  private def createConfArchive(): File = {
     val hadoopConfFiles = new HashMap[String, File]()
     Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
       sys.env.get(envKey).foreach { path =>
@@ -411,28 +449,32 @@ private[spark] class Client(
       }
     }
 
-    if (!hadoopConfFiles.isEmpty) {
-      val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, 
".zip",
-        new File(Utils.getLocalDir(sparkConf)))
+    val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip",
+      new File(Utils.getLocalDir(sparkConf)))
+    val confStream = new ZipOutputStream(new FileOutputStream(confArchive))
 
-      val hadoopConfStream = new ZipOutputStream(new 
FileOutputStream(hadoopConfArchive))
-      try {
-        hadoopConfStream.setLevel(0)
-        hadoopConfFiles.foreach { case (name, file) =>
-          if (file.canRead()) {
-            hadoopConfStream.putNextEntry(new ZipEntry(name))
-            Files.copy(file, hadoopConfStream)
-            hadoopConfStream.closeEntry()
-          }
+    try {
+      confStream.setLevel(0)
+      hadoopConfFiles.foreach { case (name, file) =>
+        if (file.canRead()) {
+          confStream.putNextEntry(new ZipEntry(name))
+          Files.copy(file, confStream)
+          confStream.closeEntry()
         }
-      } finally {
-        hadoopConfStream.close()
       }
 
-      Some(hadoopConfArchive)
-    } else {
-      None
+      // Save Spark configuration to a file in the archive.
+      val props = new Properties()
+      sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
+      confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
+      val writer = new OutputStreamWriter(confStream, UTF_8)
+      props.store(writer, "Spark configuration.")
+      writer.flush()
+      confStream.closeEntry()
+    } finally {
+      confStream.close()
     }
+    confArchive
   }
 
   /**
@@ -460,7 +502,9 @@ private[spark] class Client(
   /**
    * Set up the environment for launching our ApplicationMaster container.
    */
-  private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
+  private def setupLaunchEnv(
+      stagingDir: String,
+      pySparkArchives: Seq[String]): HashMap[String, String] = {
     logInfo("Setting up the launch environment for our AM container")
     val env = new HashMap[String, String]()
     val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
@@ -478,9 +522,6 @@ private[spark] class Client(
       val renewalInterval = getTokenRenewalInterval(stagingDirPath)
       sparkConf.set("spark.yarn.token.renewal.interval", 
renewalInterval.toString)
     }
-    // Set the environment variables to be passed on to the executors.
-    distCacheMgr.setDistFilesEnv(env)
-    distCacheMgr.setDistArchivesEnv(env)
 
     // Pick up any environment variables for the AM provided through 
spark.yarn.appMasterEnv.*
     val amEnvPrefix = "spark.yarn.appMasterEnv."
@@ -497,15 +538,32 @@ private[spark] class Client(
       env("SPARK_YARN_USER_ENV") = userEnvs
     }
 
-    // if spark.submit.pyArchives is in sparkConf, append pyArchives to 
PYTHONPATH
-    // that can be passed on to the ApplicationMaster and the executors.
-    if (sparkConf.contains("spark.submit.pyArchives")) {
-      var pythonPath = sparkConf.get("spark.submit.pyArchives")
-      if (env.contains("PYTHONPATH")) {
-        pythonPath = Seq(env.get("PYTHONPATH"), 
pythonPath).mkString(File.pathSeparator)
+    // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR 
to the PYTHONPATH
+    // of the container processes too. Add all non-.py files directly to 
PYTHONPATH.
+    //
+    // NOTE: the code currently does not handle .py files defined with a 
"local:" scheme.
+    val pythonPath = new ListBuffer[String]()
+    val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py"))
+    if (pyFiles.nonEmpty) {
+      pythonPath += 
buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+        LOCALIZED_PYTHON_DIR)
+    }
+    (pySparkArchives ++ pyArchives).foreach { path =>
+      val uri = new URI(path)
+      if (uri.getScheme != LOCAL_SCHEME) {
+        pythonPath += 
buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+          new Path(path).getName())
+      } else {
+        pythonPath += uri.getPath()
       }
-      env("PYTHONPATH") = pythonPath
-      sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
+    }
+
+    // Finally, update the Spark config to propagate PYTHONPATH to the AM and 
executors.
+    if (pythonPath.nonEmpty) {
+      val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath)
+        .mkString(YarnSparkHadoopUtil.getClassPathSeparator)
+      env("PYTHONPATH") = pythonPathStr
+      sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
     }
 
     // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to 
propagate it to
@@ -555,8 +613,19 @@ private[spark] class Client(
     logInfo("Setting up container launch context for our AM")
     val appId = newAppResponse.getApplicationId
     val appStagingDir = getAppStagingDir(appId)
-    val localResources = prepareLocalResources(appStagingDir)
-    val launchEnv = setupLaunchEnv(appStagingDir)
+    val pySparkArchives =
+      if (sys.props.getOrElse("spark.yarn.isPython", "false").toBoolean) {
+        findPySparkArchives()
+      } else {
+        Nil
+      }
+    val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
+    val localResources = prepareLocalResources(appStagingDir, pySparkArchives)
+
+    // Set the environment variables to be passed on to the executors.
+    distCacheMgr.setDistFilesEnv(launchEnv)
+    distCacheMgr.setDistArchivesEnv(launchEnv)
+
     val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
     amContainer.setLocalResources(localResources)
     amContainer.setEnvironment(launchEnv)
@@ -596,13 +665,6 @@ private[spark] class Client(
       javaOpts += "-XX:CMSIncrementalDutyCycle=10"
     }
 
-    // Forward the Spark configuration to the application master / executors.
-    // TODO: it might be nicer to pass these as an internal environment 
variable rather than
-    // as Java options, due to complications with string parsing of nested 
quotes.
-    for ((k, v) <- sparkConf.getAll) {
-      javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
-    }
-
     // Include driver-specific java options if we are launching a driver
     if (isClusterMode) {
       val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions")
@@ -655,14 +717,8 @@ private[spark] class Client(
         Nil
       }
     val primaryPyFile =
-      if (args.primaryPyFile != null) {
-        Seq("--primary-py-file", args.primaryPyFile)
-      } else {
-        Nil
-      }
-    val pyFiles =
-      if (args.pyFiles != null) {
-        Seq("--py-files", args.pyFiles)
+      if (isClusterMode && args.primaryPyFile != null) {
+        Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
       } else {
         Nil
       }
@@ -678,9 +734,6 @@ private[spark] class Client(
       } else {
         Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
       }
-    if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
-      args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ 
args.userArgs
-    }
     if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
       args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
     }
@@ -688,11 +741,13 @@ private[spark] class Client(
       Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
     }
     val amArgs =
-      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ 
primaryRFile ++
+      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
         userArgs ++ Seq(
           "--executor-memory", args.executorMemory.toString + "m",
           "--executor-cores", args.executorCores.toString,
-          "--num-executors ", args.numExecutors.toString)
+          "--num-executors ", args.numExecutors.toString,
+          "--properties-file", 
buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+            LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
 
     // Command for the ApplicationMaster
     val commands = prefixEnv ++ Seq(
@@ -857,6 +912,22 @@ private[spark] class Client(
       }
     }
   }
+
+  private def findPySparkArchives(): Seq[String] = {
+    sys.env.get("PYSPARK_ARCHIVES_PATH")
+      .map(_.split(",").toSeq)
+      .getOrElse {
+        val pyLibPath = Seq(sys.env("SPARK_HOME"), "python", 
"lib").mkString(File.separator)
+        val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+        require(pyArchivesFile.exists(),
+          "pyspark.zip not found; cannot run pyspark application in YARN 
mode.")
+        val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+        require(py4jFile.exists(),
+          "py4j-0.8.2.1-src.zip not found; cannot run pyspark application in 
YARN mode.")
+        Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
+      }
+  }
+
 }
 
 object Client extends Logging {
@@ -907,8 +978,14 @@ object Client extends Logging {
   // Distribution-defined classpath to add to processes
   val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
 
-  // Subdirectory where the user's hadoop config files will be placed.
-  val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
+  // Subdirectory where the user's Spark and Hadoop config files will be 
placed.
+  val LOCALIZED_CONF_DIR = "__spark_conf__"
+
+  // Name of the file in the conf archive containing Spark configuration.
+  val SPARK_CONF_FILE = "__spark_conf__.properties"
+
+  // Subdirectory where the user's python files (not archives) will be placed.
+  val LOCALIZED_PYTHON_DIR = "__pyfiles__"
 
   /**
    * Find the user-defined Spark jar if configured, or return the jar 
containing this
@@ -1033,7 +1110,7 @@ object Client extends Logging {
     if (isAM) {
       addClasspathEntry(
         YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + 
Path.SEPARATOR +
-          LOCALIZED_HADOOP_CONF_DIR, env)
+          LOCALIZED_CONF_DIR, env)
     }
 
     if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/38112905/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 9c7b1b3..35e9906 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -30,7 +30,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   var archives: String = null
   var userJar: String = null
   var userClass: String = null
-  var pyFiles: String = null
+  var pyFiles: Seq[String] = Nil
   var primaryPyFile: String = null
   var primaryRFile: String = null
   var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
@@ -228,7 +228,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
           args = tail
 
         case ("--py-files") :: value :: tail =>
-          pyFiles = value
+          pyFiles = value.split(",")
           args = tail
 
         case ("--files") :: value :: tail =>

http://git-wip-us.apache.org/repos/asf/spark/blob/38112905/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 99c0532..1c8d7ec 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -76,7 +76,8 @@ private[spark] class YarnClientSchedulerBackend(
         ("--executor-memory", "SPARK_EXECUTOR_MEMORY", 
"spark.executor.memory"),
         ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
         ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
-        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue")
+        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+        ("--py-files", null, "spark.submit.pyFiles")
       )
     // Warn against the following deprecated environment variables: env var -> 
suggestion
     val deprecatedEnvVars = Map(
@@ -86,7 +87,7 @@ private[spark] class YarnClientSchedulerBackend(
     optionTuples.foreach { case (optionName, envVar, sparkProp) =>
       if (sc.getConf.contains(sparkProp)) {
         extraArgs += (optionName, sc.getConf.get(sparkProp))
-      } else if (System.getenv(envVar) != null) {
+      } else if (envVar != null && System.getenv(envVar) != null) {
         extraArgs += (optionName, System.getenv(envVar))
         if (deprecatedEnvVars.contains(envVar)) {
           logWarning(s"NOTE: $envVar is deprecated. Use 
${deprecatedEnvVars(envVar)} instead.")

http://git-wip-us.apache.org/repos/asf/spark/blob/38112905/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 01d33c9..4ec976a 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -113,7 +113,7 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
         Environment.PWD.$()
       }
     cp should contain(pwdVar)
-    cp should contain 
(s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
+    cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
     cp should not contain (Client.SPARK_JAR)
     cp should not contain (Client.APP_JAR)
   }
@@ -129,7 +129,7 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
 
     val tempDir = Utils.createTempDir()
     try {
-      client.prepareLocalResources(tempDir.getAbsolutePath())
+      client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
       sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER))
 
       // The non-local path should be propagated by name only, since it will 
end up in the app's

http://git-wip-us.apache.org/repos/asf/spark/blob/38112905/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 93d587d..a0f25ba 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -56,6 +56,7 @@ class YarnClusterSuite extends SparkFunSuite with 
BeforeAndAfterAll with Matcher
     """.stripMargin
 
   private val TEST_PYFILE = """
+    |import mod1, mod2
     |import sys
     |from operator import add
     |
@@ -67,7 +68,7 @@ class YarnClusterSuite extends SparkFunSuite with 
BeforeAndAfterAll with Matcher
     |    sc = SparkContext(conf=SparkConf())
     |    status = open(sys.argv[1],'w')
     |    result = "failure"
-    |    rdd = sc.parallelize(range(10))
+    |    rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * 
mod2.func())
     |    cnt = rdd.count()
     |    if cnt == 10:
     |        result = "success"
@@ -76,6 +77,11 @@ class YarnClusterSuite extends SparkFunSuite with 
BeforeAndAfterAll with Matcher
     |    sc.stop()
     """.stripMargin
 
+  private val TEST_PYMODULE = """
+    |def func():
+    |    return 42
+    """.stripMargin
+
   private var yarnCluster: MiniYARNCluster = _
   private var tempDir: File = _
   private var fakeSparkJar: File = _
@@ -124,7 +130,7 @@ class YarnClusterSuite extends SparkFunSuite with 
BeforeAndAfterAll with Matcher
     logInfo(s"RM address in configuration is 
${config.get(YarnConfiguration.RM_ADDRESS)}")
 
     fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
-    hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+    hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
     assert(hadoopConfDir.mkdir())
     File.createTempFile("token", ".txt", hadoopConfDir)
   }
@@ -151,26 +157,12 @@ class YarnClusterSuite extends SparkFunSuite with 
BeforeAndAfterAll with Matcher
     }
   }
 
-  // Enable this once fix SPARK-6700
-  test("run Python application in yarn-cluster mode") {
-    val primaryPyFile = new File(tempDir, "test.py")
-    Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
-    val pyFile = new File(tempDir, "test2.py")
-    Files.write(TEST_PYFILE, pyFile, UTF_8)
-    var result = File.createTempFile("result", null, tempDir)
+  test("run Python application in yarn-client mode") {
+    testPySpark(true)
+  }
 
-    // The sbt assembly does not include pyspark / py4j python dependencies, 
so we need to
-    // propagate SPARK_HOME so that those are added to PYTHONPATH. See 
PythonUtils.scala.
-    val sparkHome = sys.props("spark.test.home")
-    val extraConf = Map(
-      "spark.executorEnv.SPARK_HOME" -> sparkHome,
-      "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
-
-    runSpark(false, primaryPyFile.getAbsolutePath(),
-      sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
-      appArgs = Seq(result.getAbsolutePath()),
-      extraConf = extraConf)
-    checkResult(result)
+  test("run Python application in yarn-cluster mode") {
+    testPySpark(false)
   }
 
   test("user class path first in client mode") {
@@ -188,6 +180,33 @@ class YarnClusterSuite extends SparkFunSuite with 
BeforeAndAfterAll with Matcher
     checkResult(result)
   }
 
+  private def testPySpark(clientMode: Boolean): Unit = {
+    val primaryPyFile = new File(tempDir, "test.py")
+    Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
+
+    val moduleDir =
+      if (clientMode) {
+        // In client-mode, .py files added with --py-files are not visible in 
the driver.
+        // This is something that the launcher library would have to handle.
+        tempDir
+      } else {
+        val subdir = new File(tempDir, "pyModules")
+        subdir.mkdir()
+        subdir
+      }
+    val pyModule = new File(moduleDir, "mod1.py")
+    Files.write(TEST_PYMODULE, pyModule, UTF_8)
+
+    val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> 
TEST_PYMODULE), moduleDir)
+    val pyFiles = Seq(pyModule.getAbsolutePath(), 
mod2Archive.getPath()).mkString(",")
+    val result = File.createTempFile("result", null, tempDir)
+
+    runSpark(clientMode, primaryPyFile.getAbsolutePath(),
+      sparkArgs = Seq("--py-files", pyFiles),
+      appArgs = Seq(result.getAbsolutePath()))
+    checkResult(result)
+  }
+
   private def testUseClassPathFirst(clientMode: Boolean): Unit = {
     // Create a jar file that contains a different version of "test.resource".
     val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> 
"ORIGINAL"), tempDir)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to