Repository: spark
Updated Branches:
  refs/heads/master f1bf0d2f3 -> 63470afc9


[SPARK-15782][YARN] Fix spark.jars and spark.yarn.dist.jars handling

When `--packages` is specified with spark-shell the classes from those packages 
cannot be found, which I think is due to some of the changes in SPARK-12343.

Tested manually with both scala 2.10 and 2.11 repls.

vanzin davies can you guys please review?

Author: Marcelo Vanzin <van...@cloudera.com>
Author: Nezih Yigitbasi <nyigitb...@netflix.com>

Closes #13709 from nezihyigitbasi/SPARK-15782.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63470afc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63470afc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63470afc

Branch: refs/heads/master
Commit: 63470afc997fb9d6b6f8a911c25964743556c9cc
Parents: f1bf0d2
Author: Nezih Yigitbasi <nyigitb...@netflix.com>
Authored: Thu Jun 16 18:19:29 2016 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Jun 16 18:20:16 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../scala/org/apache/spark/util/Utils.scala     | 25 +++++++++++++++
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 12 ++++++++
 .../org/apache/spark/repl/SparkILoop.scala      | 32 ++++++++++++--------
 .../main/scala/org/apache/spark/repl/Main.scala |  4 +--
 5 files changed, 59 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/63470afc/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d56946e..d870181 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
 
     _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
 
-    _jars = 
_conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+    _jars = Utils.getUserJars(_conf)
     _files = 
_conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
       .toSeq.flatten
 

http://git-wip-us.apache.org/repos/asf/spark/blob/63470afc/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f9d0540..17d193b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2352,6 +2352,31 @@ private[spark] object Utils extends Logging {
     log.info(s"Started daemon with process name: ${Utils.getProcessName()}")
     SignalUtils.registerLogger(log)
   }
+
+  /**
+   * Unions two comma-separated lists of files and filters out empty strings.
+   */
+  def unionFileLists(leftList: Option[String], rightList: Option[String]): 
Set[String] = {
+    var allFiles = Set[String]()
+    leftList.foreach { value => allFiles ++= value.split(",") }
+    rightList.foreach { value => allFiles ++= value.split(",") }
+    allFiles.filter { _.nonEmpty }
+  }
+
+  /**
+   * In YARN mode this method returns a union of the jar files pointed by 
"spark.jars" and the
+   * "spark.yarn.dist.jars" properties, while in other modes it returns the 
jar files pointed by
+   * only the "spark.jars" property.
+   */
+  def getUserJars(conf: SparkConf): Seq[String] = {
+    val sparkJars = conf.getOption("spark.jars")
+    if (conf.get("spark.master") == "yarn") {
+      val yarnJars = conf.getOption("spark.yarn.dist.jars")
+      unionFileLists(sparkJars, yarnJars).toSeq
+    } else {
+      sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/63470afc/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 2718976..0b02059 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -570,6 +570,18 @@ class SparkSubmitSuite
       appArgs.executorMemory should be ("2.3g")
     }
   }
+
+  test("comma separated list of files are unioned correctly") {
+    val left = Option("/tmp/a.jar,/tmp/b.jar")
+    val right = Option("/tmp/c.jar,/tmp/a.jar")
+    val emptyString = Option("")
+    Utils.unionFileLists(left, right) should be (Set("/tmp/a.jar", 
"/tmp/b.jar", "/tmp/c.jar"))
+    Utils.unionFileLists(emptyString, emptyString) should be (Set.empty)
+    Utils.unionFileLists(Option("/tmp/a.jar"), emptyString) should be 
(Set("/tmp/a.jar"))
+    Utils.unionFileLists(emptyString, Option("/tmp/a.jar")) should be 
(Set("/tmp/a.jar"))
+    Utils.unionFileLists(None, Option("/tmp/a.jar")) should be 
(Set("/tmp/a.jar"))
+    Utils.unionFileLists(Option("/tmp/a.jar"), None) should be 
(Set("/tmp/a.jar"))
+  }
   // scalastyle:on println
 
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use 
sparingly.

http://git-wip-us.apache.org/repos/asf/spark/blob/63470afc/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index b1e95d8..8fcab38 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -201,10 +201,10 @@ class SparkILoop(
       if (Utils.isWindows) {
         // Strip any URI scheme prefix so we can add the correct path to the 
classpath
         // e.g. file:/C:/my/path.jar -> C:/my/path.jar
-        SparkILoop.getAddedJars.map { jar => new 
URI(jar).getPath.stripPrefix("/") }
+        getAddedJars().map { jar => new URI(jar).getPath.stripPrefix("/") }
       } else {
         // We need new URI(jar).getPath here for the case that `jar` includes 
encoded white space (%20).
-        SparkILoop.getAddedJars.map { jar => new URI(jar).getPath }
+        getAddedJars().map { jar => new URI(jar).getPath }
       }
     // work around for Scala bug
     val totalClassPath = addedJars.foldLeft(
@@ -1005,7 +1005,7 @@ class SparkILoop(
   @DeveloperApi
   def createSparkSession(): SparkSession = {
     val execUri = System.getenv("SPARK_EXECUTOR_URI")
-    val jars = SparkILoop.getAddedJars
+    val jars = getAddedJars()
     val conf = new SparkConf()
       .setMaster(getMaster())
       .setJars(jars)
@@ -1060,22 +1060,30 @@ class SparkILoop(
 
   @deprecated("Use `process` instead", "2.9.0")
   private def main(settings: Settings): Unit = process(settings)
-}
 
-object SparkILoop extends Logging {
-  implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
-  private def echo(msg: String) = Console println msg
-
-  def getAddedJars: Array[String] = {
+  private[repl] def getAddedJars(): Array[String] = {
+    val conf = new SparkConf().setMaster(getMaster())
     val envJars = sys.env.get("ADD_JARS")
     if (envJars.isDefined) {
       logWarning("ADD_JARS environment variable is deprecated, use --jar spark 
submit argument instead")
     }
-    val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") 
None else Some(p) }
-    val jars = propJars.orElse(envJars).getOrElse("")
+    val jars = {
+      val userJars = Utils.getUserJars(conf)
+      if (userJars.isEmpty) {
+        envJars.getOrElse("")
+      } else {
+        userJars.mkString(",")
+      }
+    }
     Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
   }
 
+}
+
+object SparkILoop extends Logging {
+  implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
+  private def echo(msg: String) = Console println msg
+
   // Designed primarily for use by test code: take a String with a
   // bunch of code, and prints out a transcript of what it would look
   // like if you'd just typed it into the repl.
@@ -1109,7 +1117,7 @@ object SparkILoop extends Logging {
         if (settings.classpath.isDefault)
           settings.classpath.value = sys.props("java.class.path")
 
-        getAddedJars.map(jar => new 
URI(jar).getPath).foreach(settings.classpath.append(_))
+        repl.getAddedJars().map(jar => new 
URI(jar).getPath).foreach(settings.classpath.append(_))
 
         repl process settings
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/63470afc/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 771670f..28fe84d 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -54,9 +54,7 @@ object Main extends Logging {
   // Visible for testing
   private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
     interp = _interp
-    val jars = conf.getOption("spark.jars")
-      .map(_.replace(",", File.pathSeparator))
-      .getOrElse("")
+    val jars = Utils.getUserJars(conf).mkString(File.pathSeparator)
     val interpArguments = List(
       "-Yrepl-class-based",
       "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to