Repository: spark
Updated Branches:
  refs/heads/master 8fff0f92a -> 07f1c5447


[SPARK-13577][YARN] Allow Spark jar to be multiple jars, archive.

In preparation for the demise of assemblies, this change allows the
YARN backend to use multiple jars and globs as the "Spark jar". The
config option has been renamed to "spark.yarn.jars" to reflect that.

A second option "spark.yarn.archive" was also added; if set, this
takes precedence and uploads an archive expected to contain the jar
files with the Spark code and its dependencies.

Existing deployments should keep working, mostly. This change drops
support for the "SPARK_JAR" environment variable, and also does not
fall back to using "jarOfClass" if no configuration is set, falling
back to finding files under SPARK_HOME instead. This should be fine
since "jarOfClass" probably wouldn't work unless you were using
spark-submit anyway.

Tested with the unit tests, and trying the different config options
on a YARN cluster.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #11500 from vanzin/SPARK-13577.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f1c544
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f1c544
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f1c544

Branch: refs/heads/master
Commit: 07f1c5447753a3d593cd6ececfcb03c11b1cf8ff
Parents: 8fff0f9
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri Mar 11 07:54:57 2016 -0600
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Fri Mar 11 07:54:57 2016 -0600

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |   4 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |   1 -
 docs/running-on-yarn.md                         |  25 ++-
 .../org/apache/spark/deploy/yarn/Client.scala   | 108 +++++++++----
 .../org/apache/spark/deploy/yarn/config.scala   |  10 +-
 .../deploy/yarn/BaseYarnClusterSuite.scala      |   3 +-
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 153 ++++++++++++++-----
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  |   3 +-
 8 files changed, 227 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07f1c544/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index aaccf49..ff8c631 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -656,7 +656,9 @@ private[spark] object SparkConf extends Logging {
     "spark.memory.offHeap.enabled" -> Seq(
       AlternateConfig("spark.unsafe.offHeap", "1.6")),
     "spark.rpc.message.maxSize" -> Seq(
-      AlternateConfig("spark.akka.frameSize", "1.6"))
+      AlternateConfig("spark.akka.frameSize", "1.6")),
+    "spark.yarn.jars" -> Seq(
+      AlternateConfig("spark.yarn.jar", "2.0"))
     )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/07f1c544/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e8d0c3f..4049fc0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -626,7 +626,6 @@ object SparkSubmit {
     val pathConfigs = Seq(
       "spark.jars",
       "spark.files",
-      "spark.yarn.jar",
       "spark.yarn.dist.files",
       "spark.yarn.dist.archives")
     pathConfigs.foreach { config =>

http://git-wip-us.apache.org/repos/asf/spark/blob/07f1c544/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index ad66b9f..8045f8c 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -272,14 +272,25 @@ If you need a reference to the proper location to put log 
files in the YARN so t
   </td>
 </tr>
 <tr>
-  <td><code>spark.yarn.jar</code></td>
+  <td><code>spark.yarn.jars</code></td>
   <td>(none)</td>
   <td>
-    The location of the Spark jar file, in case overriding the default 
location is desired.
-    By default, Spark on YARN will use a Spark jar installed locally, but the 
Spark jar can also be
+    List of libraries containing Spark code to distribute to YARN containers.
+    By default, Spark on YARN will use Spark jars installed locally, but the 
Spark jars can also be
     in a world-readable location on HDFS. This allows YARN to cache it on 
nodes so that it doesn't
-    need to be distributed each time an application runs. To point to a jar on 
HDFS, for example,
-    set this configuration to <code>hdfs:///some/path</code>.
+    need to be distributed each time an application runs. To point to jars on 
HDFS, for example,
+    set this configuration to <code>hdfs:///some/path</code>. Globs are 
allowed.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.archive</code></td>
+  <td>(none)</td>
+  <td>
+    An archive containing needed Spark jars for distribution to the YARN 
cache. If set, this
+    configuration replaces <code>spark.yarn.jars</code> and the archive is 
used in all the
+    application's containers. The archive should contain jar files in its root 
directory.
+    Like with the previous option, the archive can also be hosted on HDFS to 
speed up file
+    distribution.
   </td>
 </tr>
 <tr>
@@ -288,8 +299,8 @@ If you need a reference to the proper location to put log 
files in the YARN so t
   <td>
     A comma-separated list of secure HDFS namenodes your Spark application is 
going to access. For
     example, 
<code>spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
-    webhdfs://nn3.com:50070</code>. The Spark application must have access to 
the namenodes listed 
-    and Kerberos must be properly configured to be able to access them (either 
in the same realm 
+    webhdfs://nn3.com:50070</code>. The Spark application must have access to 
the namenodes listed
+    and Kerberos must be properly configured to be able to access them (either 
in the same realm
     or in a trusted realm). Spark acquires security tokens for each of the 
namenodes so that
     the Spark application can access those remote HDFS clusters.
   </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/07f1c544/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6ca9669..0b5ceb7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -423,7 +423,63 @@ private[spark] class Client(
     }
 
     /**
-     * Copy the given main resource to the distributed cache if the scheme is 
not "local".
+     * Add Spark to the cache. There are two settings that control what files 
to add to the cache:
+     * - if a Spark archive is defined, use the archive. The archive is 
expected to contain
+     *   jar files at its root directory.
+     * - if a list of jars is provided, filter the non-local ones, resolve 
globs, and
+     *   add the found files to the cache.
+     *
+     * Note that the archive cannot be a "local" URI. If none of the above 
settings are found,
+     * then upload all files found in $SPARK_HOME/jars.
+     *
+     * TODO: currently the code looks in $SPARK_HOME/lib while the work to 
replace assemblies
+     * with a directory full of jars is ongoing.
+     */
+    val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
+    if (sparkArchive.isDefined) {
+      val archive = sparkArchive.get
+      require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local 
URI.")
+      distribute(Utils.resolveURI(archive).toString,
+        resType = LocalResourceType.ARCHIVE,
+        destName = Some(LOCALIZED_LIB_DIR))
+    } else {
+      sparkConf.get(SPARK_JARS) match {
+        case Some(jars) =>
+          // Break the list of jars to upload, and resolve globs.
+          val localJars = new ArrayBuffer[String]()
+          jars.foreach { jar =>
+            if (!isLocalUri(jar)) {
+              val path = getQualifiedLocalPath(Utils.resolveURI(jar), 
hadoopConf)
+              val pathFs = FileSystem.get(path.toUri(), hadoopConf)
+              pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
+                distribute(entry.getPath().toUri().toString(),
+                  targetDir = Some(LOCALIZED_LIB_DIR))
+              }
+            } else {
+              localJars += jar
+            }
+          }
+
+          // Propagate the local URIs to the containers using the 
configuration.
+          sparkConf.set(SPARK_JARS, localJars)
+
+        case None =>
+          // No configuration, so fall back to uploading local jar files.
+          logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is 
set, falling back " +
+            "to uploading libraries under SPARK_HOME.")
+          val jarsDir = new File(sparkConf.getenv("SPARK_HOME"), "lib")
+          if (jarsDir.isDirectory()) {
+            jarsDir.listFiles().foreach { f =>
+              if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) {
+                distribute(f.getAbsolutePath(), targetDir = 
Some(LOCALIZED_LIB_DIR))
+              }
+            }
+          }
+      }
+    }
+
+    /**
+     * Copy a few resources to the distributed cache if their scheme is not 
"local".
      * Otherwise, set the corresponding key in our SparkConf to handle it 
downstream.
      * Each resource is represented by a 3-tuple of:
      *   (1) destination resource name,
@@ -431,8 +487,7 @@ private[spark] class Client(
      *   (3) Spark property key to set if the scheme is not local
      */
     List(
-      (SPARK_JAR_NAME, sparkJar(sparkConf), SPARK_JAR.key),
-      (APP_JAR_NAME, args.userJar, APP_JAR.key),
+      (APP_JAR_NAME, args.userJar, APP_JAR),
       ("log4j.properties", oldLog4jConf.orNull, null)
     ).foreach { case (destName, path, confKey) =>
       if (path != null && !path.trim().isEmpty()) {
@@ -1062,8 +1117,7 @@ object Client extends Logging {
     new Client(args, sparkConf).run()
   }
 
-  // Alias for the Spark assembly jar and the user jar
-  val SPARK_JAR_NAME: String = "__spark__.jar"
+  // Alias for the user jar
   val APP_JAR_NAME: String = "__app__.jar"
 
   // URI scheme that identifies local resources
@@ -1072,8 +1126,6 @@ object Client extends Logging {
   // Staging directory for any temporary jars or files
   val SPARK_STAGING: String = ".sparkStaging"
 
-  // Location of any user-defined Spark jars
-  val ENV_SPARK_JAR = "SPARK_JAR"
 
   // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission =
@@ -1095,28 +1147,8 @@ object Client extends Logging {
   // Subdirectory where the user's python files (not archives) will be placed.
   val LOCALIZED_PYTHON_DIR = "__pyfiles__"
 
-  /**
-   * Find the user-defined Spark jar if configured, or return the jar 
containing this
-   * class if not.
-   *
-   * This method first looks in the SparkConf object for the spark.yarn.jar 
key, and in the
-   * user environment if that is not found (for backwards compatibility).
-   */
-  private def sparkJar(conf: SparkConf): String = {
-    conf.get(SPARK_JAR).getOrElse(
-      if (System.getenv(ENV_SPARK_JAR) != null) {
-        logWarning(
-          s"$ENV_SPARK_JAR detected in the system environment. This variable 
has been deprecated " +
-            s"in favor of the ${SPARK_JAR.key} configuration variable.")
-        System.getenv(ENV_SPARK_JAR)
-      } else {
-        SparkContext.jarOfClass(this.getClass).getOrElse(throw new 
SparkException("Could not "
-          + "find jar containing Spark classes. The jar can be defined using 
the "
-          + s"${SPARK_JAR.key} configuration option. If testing Spark, either 
set that option "
-          + "or make sure SPARK_PREPEND_CLASSES is not set."))
-      }
-    )
-  }
+  // Subdirectory where Spark libraries will be placed.
+  val LOCALIZED_LIB_DIR = "__spark_libs__"
 
   /**
    * Return the path to the given application's staging directory.
@@ -1236,7 +1268,18 @@ object Client extends Logging {
         addFileToClasspath(sparkConf, conf, x, null, env)
       }
     }
-    addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), 
SPARK_JAR_NAME, env)
+
+    // Add the Spark jars to the classpath, depending on how they were 
distributed.
+    
addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+      LOCALIZED_LIB_DIR, "*"), env)
+    if (!sparkConf.get(SPARK_ARCHIVE).isDefined) {
+      sparkConf.get(SPARK_JARS).foreach { jars =>
+        jars.filter(isLocalUri).foreach { jar =>
+          addClasspathEntry(getClusterPath(sparkConf, jar), env)
+        }
+      }
+    }
+
     populateHadoopClasspath(conf, env)
     sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
       addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1392,4 +1435,9 @@ object Client extends Logging {
     components.mkString(Path.SEPARATOR)
   }
 
+  /** Returns whether the URI is a "local:" URI. */
+  def isLocalUri(uri: String): Boolean = {
+    uri.startsWith(s"$LOCAL_SCHEME:")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/07f1c544/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 06c1be9..10cd6d0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -72,11 +72,17 @@ package object config {
 
   /* File distribution. */
 
-  private[spark] val SPARK_JAR = ConfigBuilder("spark.yarn.jar")
-    .doc("Location of the Spark jar to use.")
+  private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
+    .doc("Location of archive containing jars files with Spark classes.")
     .stringConf
     .optional
 
+  private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars")
+    .doc("Location of jars containing Spark classes.")
+    .stringConf
+    .toSequence
+    .optional
+
   private[spark] val ARCHIVES_TO_DISTRIBUTE = 
ConfigBuilder("spark.yarn.dist.archives")
     .stringConf
     .optional

http://git-wip-us.apache.org/repos/asf/spark/blob/07f1c544/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 272e245..b12e506 100644
--- 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -34,6 +34,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers}
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.launcher._
 import org.apache.spark.util.Utils
 
@@ -202,7 +203,7 @@ abstract class BaseYarnClusterSuite
       extraClassPath: Seq[String] = Nil,
       extraConf: Map[String, String] = Map()): String = {
     val props = new Properties()
-    props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+    props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath())
 
     val testClasspath = new TestClasspathBuilder()
       .buildClassPath(

http://git-wip-us.apache.org/repos/asf/spark/blob/07f1c544/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index b57c179..24472e0 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -36,17 +36,19 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.YarnClientApplication
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
-import org.mockito.Matchers._
+import org.mockito.Matchers.{eq => meq, _}
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterAll, Matchers}
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.util.{ResetSystemProperties, Utils}
+import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
 
 class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
   with ResetSystemProperties {
 
+  import Client._
+
   var oldSystemProperties: Properties = null
 
   override def beforeAll(): Unit = {
@@ -65,35 +67,35 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
   }
 
   test("default Yarn application classpath") {
-    Client.getDefaultYarnApplicationClasspath should 
be(Some(Fixtures.knownDefYarnAppCP))
+    getDefaultYarnApplicationClasspath should 
be(Some(Fixtures.knownDefYarnAppCP))
   }
 
   test("default MR application classpath") {
-    Client.getDefaultMRApplicationClasspath should 
be(Some(Fixtures.knownDefMRAppCP))
+    getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
   }
 
   test("resultant classpath for an application that defines a classpath for 
YARN") {
     withAppConf(Fixtures.mapYARNAppConf) { conf =>
       val env = newEnv
-      Client.populateHadoopClasspath(conf, env)
+      populateHadoopClasspath(conf, env)
       classpath(env) should be(
-        flatten(Fixtures.knownYARNAppCP, 
Client.getDefaultMRApplicationClasspath))
+        flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath))
     }
   }
 
   test("resultant classpath for an application that defines a classpath for 
MR") {
     withAppConf(Fixtures.mapMRAppConf) { conf =>
       val env = newEnv
-      Client.populateHadoopClasspath(conf, env)
+      populateHadoopClasspath(conf, env)
       classpath(env) should be(
-        flatten(Client.getDefaultYarnApplicationClasspath, 
Fixtures.knownMRAppCP))
+        flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
     }
   }
 
   test("resultant classpath for an application that defines both classpaths, 
YARN and MR") {
     withAppConf(Fixtures.mapAppConf) { conf =>
       val env = newEnv
-      Client.populateHadoopClasspath(conf, env)
+      populateHadoopClasspath(conf, env)
       classpath(env) should be(flatten(Fixtures.knownYARNAppCP, 
Fixtures.knownMRAppCP))
     }
   }
@@ -102,47 +104,43 @@ class ClientSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterAll
   private val USER = "local:/userJar"
   private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
 
+  private val PWD =
+    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+      "{{PWD}}"
+    } else if (Utils.isWindows) {
+      "%PWD%"
+    } else {
+      Environment.PWD.$()
+    }
+
   test("Local jar URIs") {
     val conf = new Configuration()
     val sparkConf = new SparkConf()
-      .set(SPARK_JAR, SPARK)
+      .set(SPARK_JARS, Seq(SPARK))
       .set(USER_CLASS_PATH_FIRST, true)
     val env = new MutableHashMap[String, String]()
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), 
sparkConf)
 
-    Client.populateClasspath(args, conf, sparkConf, env, true)
+    populateClasspath(args, conf, sparkConf, env, true)
 
     val cp = env("CLASSPATH").split(":|;|<CPS>")
     s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
       val uri = new URI(entry)
-      if (Client.LOCAL_SCHEME.equals(uri.getScheme())) {
+      if (LOCAL_SCHEME.equals(uri.getScheme())) {
         cp should contain (uri.getPath())
       } else {
         cp should not contain (uri.getPath())
       }
     })
-    val pwdVar =
-      if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
-        "{{PWD}}"
-      } else if (Utils.isWindows) {
-        "%PWD%"
-      } else {
-        Environment.PWD.$()
-      }
-    cp should contain(pwdVar)
-    cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
-    cp should not contain (Client.SPARK_JAR_NAME)
-    cp should not contain (Client.APP_JAR_NAME)
+    cp should contain(PWD)
+    cp should contain (s"$PWD${Path.SEPARATOR}${LOCALIZED_CONF_DIR}")
+    cp should not contain (APP_JAR)
   }
 
   test("Jar path propagation through SparkConf") {
-    val conf = new Configuration()
-    val sparkConf = new SparkConf().set(SPARK_JAR, SPARK)
-    val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), 
sparkConf)
-
-    val client = spy(new Client(args, conf, sparkConf))
-    doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
-      any(classOf[Path]), anyShort())
+    val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK))
+    val client = createClient(sparkConf,
+      args = Array("--jar", USER, "--addJars", ADDED))
 
     val tempDir = Utils.createTempDir()
     try {
@@ -154,7 +152,7 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
       val expected = ADDED.split(",")
         .map(p => {
           val uri = new URI(p)
-          if (Client.LOCAL_SCHEME == uri.getScheme()) {
+          if (LOCAL_SCHEME == uri.getScheme()) {
             p
           } else {
             Option(uri.getFragment()).getOrElse(new File(p).getName())
@@ -171,16 +169,16 @@ class ClientSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterAll
   test("Cluster path translation") {
     val conf = new Configuration()
     val sparkConf = new SparkConf()
-      .set(SPARK_JAR.key, "local:/localPath/spark.jar")
+      .set(SPARK_JARS, Seq("local:/localPath/spark.jar"))
       .set(GATEWAY_ROOT_PATH, "/localPath")
       .set(REPLACEMENT_ROOT_PATH, "/remotePath")
 
-    Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
-    Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
+    getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
+    getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
       "/remotePath/1:/remotePath/2")
 
     val env = new MutableHashMap[String, String]()
-    Client.populateClasspath(null, conf, sparkConf, env, false,
+    populateClasspath(null, conf, sparkConf, env, false,
       extraClassPath = Some("/localPath/my1.jar"))
     val cp = classpath(env)
     cp should contain ("/remotePath/spark.jar")
@@ -220,6 +218,70 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
     appContext.getMaxAppAttempts should be (42)
   }
 
+  test("spark.yarn.jars with multiple paths and globs") {
+    val libs = Utils.createTempDir()
+    val single = Utils.createTempDir()
+    val jar1 = TestUtils.createJarWithFiles(Map(), libs)
+    val jar2 = TestUtils.createJarWithFiles(Map(), libs)
+    val jar3 = TestUtils.createJarWithFiles(Map(), single)
+    val jar4 = TestUtils.createJarWithFiles(Map(), single)
+
+    val jarsConf = Seq(
+      s"${libs.getAbsolutePath()}/*",
+      jar3.getPath(),
+      s"local:${jar4.getPath()}",
+      s"local:${single.getAbsolutePath()}/*")
+
+    val sparkConf = new SparkConf().set(SPARK_JARS, jarsConf)
+    val client = createClient(sparkConf)
+
+    val tempDir = Utils.createTempDir()
+    client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
+
+    assert(sparkConf.get(SPARK_JARS) ===
+      Some(Seq(s"local:${jar4.getPath()}", 
s"local:${single.getAbsolutePath()}/*")))
+
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar1.toURI())), anyShort())
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar2.toURI())), anyShort())
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar3.toURI())), anyShort())
+
+    val cp = classpath(client)
+    cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+    cp should not contain (jar3.getPath())
+    cp should contain (jar4.getPath())
+    cp should contain (buildPath(single.getAbsolutePath(), "*"))
+  }
+
+  test("distribute jars archive") {
+    val temp = Utils.createTempDir()
+    val archive = TestUtils.createJarWithFiles(Map(), temp)
+
+    val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath())
+    val client = createClient(sparkConf)
+    client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(archive.toURI())), anyShort())
+    classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+
+    sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
+    intercept[IllegalArgumentException] {
+      client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+    }
+  }
+
+  test("distribute local spark jars") {
+    val temp = Utils.createTempDir()
+    val jarsDir = new File(temp, "lib")
+    assert(jarsDir.mkdir())
+    val jar = TestUtils.createJarWithFiles(Map(), jarsDir)
+
+    val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> 
temp.getAbsolutePath()))
+    val client = createClient(sparkConf)
+    client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar.toURI())), anyShort())
+    classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+  }
+
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =
@@ -280,4 +342,21 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
     }.toOption.getOrElse(defaults)
   }
 
+  private def createClient(
+      sparkConf: SparkConf,
+      conf: Configuration = new Configuration(),
+      args: Array[String] = Array()): Client = {
+    val clientArgs = new ClientArguments(args, sparkConf)
+    val client = spy(new Client(clientArgs, conf, sparkConf))
+    doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+      any(classOf[Path]), anyShort())
+    client
+  }
+
+  private def classpath(client: Client): Array[String] = {
+    val env = new MutableHashMap[String, String]()
+    populateClasspath(null, client.hadoopConf, client.sparkConf, env, false)
+    classpath(env)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/07f1c544/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 1dd2f93..0587444 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -29,6 +29,7 @@ import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.deploy.yarn.YarnAllocator._
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.rpc.RpcEndpointRef
@@ -55,7 +56,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter
   val sparkConf = new SparkConf()
   sparkConf.set("spark.driver.host", "localhost")
   sparkConf.set("spark.driver.port", "4040")
-  sparkConf.set("spark.yarn.jar", "notarealjar.jar")
+  sparkConf.set(SPARK_JARS, Seq("notarealjar.jar"))
   sparkConf.set("spark.yarn.launchContainers", "false")
 
   val appAttemptId = 
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to