Repository: spark
Updated Branches:
  refs/heads/master 2a948e7e1 -> dba314029


[SPARK-1870] Make spark-submit --jars work in yarn-cluster mode.

Sent secondary jars to distributed cache of all containers and add the cached 
jars to classpath before executors start. Tested on a YARN cluster (CDH-5.0).

`spark-submit --jars` also works in standalone server and `yarn-client`. Thanks 
for @andrewor14 for testing!

I removed "Doesn't work for drivers in standalone mode with "cluster" deploy 
mode." from `spark-submit`'s help message, though we haven't tested mesos yet.

CC: @dbtsai @sryza

Author: Xiangrui Meng <m...@databricks.com>

Closes #848 from mengxr/yarn-classpath and squashes the following commits:

23e7df4 [Xiangrui Meng] rename spark.jar to __spark__.jar and app.jar to 
__app__.jar to avoid confliction apped $CWD/ and $CWD/* to the classpath remove 
unused methods
a40f6ed [Xiangrui Meng] standalone -> cluster
65e04ad [Xiangrui Meng] update spark-submit help message and add a comment for 
yarn-client
11e5354 [Xiangrui Meng] minor changes
3e7e1c4 [Xiangrui Meng] use sparkConf instead of hadoop conf
dc3c825 [Xiangrui Meng] add secondary jars to classpath in yarn


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba31402
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba31402
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba31402

Branch: refs/heads/master
Commit: dba314029b4c9d72d7e48a2093b39edd01931f57
Parents: 2a948e7
Author: Xiangrui Meng <m...@databricks.com>
Authored: Thu May 22 01:52:50 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu May 22 01:52:50 2014 -0700

----------------------------------------------------------------------
 .../spark/deploy/SparkSubmitArguments.scala     |  3 +-
 .../apache/spark/deploy/yarn/ClientBase.scala   | 69 +++++---------------
 .../cluster/YarnClientSchedulerBackend.scala    |  2 +-
 3 files changed, 19 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dba31402/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 264d454..0cc05fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -326,8 +326,7 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
         |  --class CLASS_NAME          Your application's main class (for Java 
/ Scala apps).
         |  --name NAME                 A name of your application.
         |  --jars JARS                 Comma-separated list of local jars to 
include on the driver
-        |                              and executor classpaths. Doesn't work 
for drivers in
-        |                              standalone mode with "cluster" deploy 
mode.
+        |                              and executor classpaths.
         |  --py-files PY_FILES         Comma-separated list of .zip or .egg 
files to place on the
         |                              PYTHONPATH for Python apps.
         |  --files FILES               Comma-separated list of files to be 
placed in the working

http://git-wip-us.apache.org/repos/asf/spark/blob/dba31402/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 27a518c..aeb3f00 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
  * Client submits an application to the YARN ResourceManager.
  *
  * Depending on the deployment mode this will launch one of two application 
master classes:
- * 1. In standalone mode, it will launch an 
[[org.apache.spark.deploy.yarn.ApplicationMaster]]
+ * 1. In cluster mode, it will launch an 
[[org.apache.spark.deploy.yarn.ApplicationMaster]]
  *      which launches a driver program inside of the cluster.
  * 2. In client mode, it will launch an 
[[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
  *      request executors on behalf of a driver running outside of the cluster.
@@ -220,10 +220,11 @@ trait ClientBase extends Logging {
       }
     }
 
+    var cachedSecondaryJarLinks = ListBuffer.empty[String]
     val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
       (args.files, LocalResourceType.FILE, false),
       (args.archives, LocalResourceType.ARCHIVE, false) )
-    fileLists.foreach { case (flist, resType, appMasterOnly) =>
+    fileLists.foreach { case (flist, resType, addToClasspath) =>
       if (flist != null && !flist.isEmpty()) {
         flist.split(',').foreach { case file: String =>
           val localURI = new URI(file.trim())
@@ -232,11 +233,15 @@ trait ClientBase extends Logging {
             val linkname = 
Option(localURI.getFragment()).getOrElse(localPath.getName())
             val destPath = copyRemoteFile(dst, localPath, replication)
             distCacheMgr.addResource(fs, conf, destPath, localResources, 
resType,
-              linkname, statCache, appMasterOnly)
+              linkname, statCache)
+            if (addToClasspath) {
+              cachedSecondaryJarLinks += linkname
+            }
           }
         }
       }
     }
+    sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, 
cachedSecondaryJarLinks.mkString(","))
 
     UserGroupInformation.getCurrentUser().addCredentials(credentials)
     localResources
@@ -374,11 +379,12 @@ trait ClientBase extends Logging {
 }
 
 object ClientBase {
-  val SPARK_JAR: String = "spark.jar"
-  val APP_JAR: String = "app.jar"
+  val SPARK_JAR: String = "__spark__.jar"
+  val APP_JAR: String = "__app__.jar"
   val LOG4J_PROP: String = "log4j.properties"
   val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
   val LOCAL_SCHEME = "local"
+  val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
 
   def getSparkJar = 
sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
 
@@ -479,67 +485,26 @@ object ClientBase {
 
     extraClassPath.foreach(addClasspathEntry)
 
-    addClasspathEntry(Environment.PWD.$())
+    val cachedSecondaryJarLinks =
+      
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
     // Normally the users app.jar is last in case conflicts with spark jars
     if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
       addPwdClasspathEntry(APP_JAR)
+      cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
       addPwdClasspathEntry(SPARK_JAR)
       ClientBase.populateHadoopClasspath(conf, env)
     } else {
       addPwdClasspathEntry(SPARK_JAR)
       ClientBase.populateHadoopClasspath(conf, env)
       addPwdClasspathEntry(APP_JAR)
+      cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
     }
+    // Append all class files and jar files under the working directory to the 
classpath.
+    addClasspathEntry(Environment.PWD.$())
     addPwdClasspathEntry("*")
   }
 
   /**
-   * Adds the user jars which have local: URIs (or alternate names, such as 
APP_JAR) explicitly
-   * to the classpath.
-   */
-  private def addUserClasspath(args: ClientArguments, env: HashMap[String, 
String]) = {
-    if (args != null) {
-      addClasspathEntry(args.userJar, APP_JAR, env)
-    }
-
-    if (args != null && args.addJars != null) {
-      args.addJars.split(",").foreach { case file: String =>
-        addClasspathEntry(file, null, env)
-      }
-    }
-  }
-
-  /**
-   * Adds the given path to the classpath, handling "local:" URIs correctly.
-   *
-   * If an alternate name for the file is given, and it's not a "local:" file, 
the alternate
-   * name will be added to the classpath (relative to the job's work 
directory).
-   *
-   * If not a "local:" file and no alternate name, the environment is not 
modified.
-   *
-   * @param path      Path to add to classpath (optional).
-   * @param fileName  Alternate name for the file (optional).
-   * @param env       Map holding the environment variables.
-   */
-  private def addClasspathEntry(path: String, fileName: String,
-      env: HashMap[String, String]) : Unit = {
-    if (path != null) {
-      scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
-        val localPath = getLocalPath(path)
-        if (localPath != null) {
-          YarnSparkHadoopUtil.addToEnvironment(env, 
Environment.CLASSPATH.name, localPath,
-            File.pathSeparator)
-          return
-        }
-      }
-    }
-    if (fileName != null) {
-      YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
-        Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
-    }
-  }
-
-  /**
    * Returns the local path if the URI is a "local:" URI, or null otherwise.
    */
   private def getLocalPath(resource: String): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/dba31402/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 0ac1625..e01ed5a 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += (
       "--class", "notused",
-      "--jar", null,
+      "--jar", null, // The primary jar will be added dynamically in 
SparkContext.
       "--args", hostport,
       "--am-class", classOf[ExecutorLauncher].getName
     )

Reply via email to