Repository: spark
Updated Branches:
  refs/heads/master d934801d5 -> 148af6082


[SPARK-2454] Do not ship spark home to Workers

When standalone Workers launch executors, they inherit the Spark home set by 
the driver. This means if the worker machines do not share the same directory 
structure as the driver node, the Workers will attempt to run scripts (e.g. 
bin/compute-classpath.sh) that do not exist locally and fail. This is a common 
scenario if the driver is launched from outside of the cluster.

The solution is to simply not pass the driver's Spark home to the Workers. This 
PR further makes an attempt to avoid overloading the usages of `spark.home`, 
which is now only used for setting executor Spark home on Mesos and in python.

This is based on top of #1392 and originally reported by YanTangZhai. Tested on 
standalone cluster.

Author: Andrew Or <andrewo...@gmail.com>

Closes #1734 from andrewor14/spark-home-reprise and squashes the following 
commits:

f71f391 [Andrew Or] Revert changes in python
1c2532c [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
spark-home-reprise
188fc5d [Andrew Or] Avoid using spark.home where possible
09272b7 [Andrew Or] Always use Worker's working directory as spark home


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/148af608
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/148af608
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/148af608

Branch: refs/heads/master
Commit: 148af6082cdb44840bbd61c7a4f67a95badad10b
Parents: d934801
Author: Andrew Or <andrewo...@gmail.com>
Authored: Sat Aug 2 00:45:38 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Sat Aug 2 00:45:38 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/ApplicationDescription.scala      | 1 -
 .../src/main/scala/org/apache/spark/deploy/JsonProtocol.scala | 1 -
 .../scala/org/apache/spark/deploy/client/TestClient.scala     | 5 ++---
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala    | 7 +++----
 .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 3 +--
 core/src/test/scala/org/apache/spark/DriverSuite.scala        | 2 +-
 .../scala/org/apache/spark/deploy/JsonProtocolSuite.scala     | 5 ++---
 .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +-
 .../org/apache/spark/deploy/worker/ExecutorRunnerTest.scala   | 7 +++----
 project/SparkBuild.scala                                      | 2 +-
 python/pyspark/context.py                                     | 2 +-
 repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala    | 3 ---
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala    | 1 -
 13 files changed, 15 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala 
b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 86305d2..65a1a8f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,7 +22,6 @@ private[spark] class ApplicationDescription(
     val maxCores: Option[Int],
     val memoryPerSlave: Int,
     val command: Command,
-    val sparkHome: Option[String],
     var appUiUrl: String,
     val eventLogDir: Option[String] = None)
   extends Serializable {

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index c4f5e29..696f32a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -56,7 +56,6 @@ private[spark] object JsonProtocol {
     ("cores" -> obj.maxCores) ~
     ("memoryperslave" -> obj.memoryPerSlave) ~
     ("user" -> obj.user) ~
-    ("sparkhome" -> obj.sparkHome) ~
     ("command" -> obj.command.toString)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index b8ffa9a..88a0862 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -48,9 +48,8 @@ private[spark] object TestClient {
     val conf = new SparkConf
     val (actorSystem, _) = AkkaUtils.createActorSystem("spark", 
Utils.localIpAddress, 0,
       conf = conf, securityManager = new SecurityManager(conf))
-    val desc = new ApplicationDescription(
-      "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", 
Seq(), Map(),
-        Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
+    val desc = new ApplicationDescription("TestClient", Some(1), 512,
+      Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), 
Seq()), "ignored")
     val listener = new TestListener
     val client = new AppClient(actorSystem, Array(url), desc, listener, new 
SparkConf)
     client.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index fb5252d..c6ea42f 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -81,7 +81,8 @@ private[spark] class Worker(
   @volatile var registered = false
   @volatile var connected = false
   val workerId = generateWorkerId()
-  val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
+  val sparkHome =
+    new 
File(sys.props.get("spark.test.home").orElse(sys.env.get("SPARK_HOME")).getOrElse("."))
   var workDir: File = null
   val executors = new HashMap[String, ExecutorRunner]
   val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -233,9 +234,7 @@ private[spark] class Worker(
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, 
execId, appDesc.name))
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, 
memory_,
-            self, workerId, host,
-            appDesc.sparkHome.map(userSparkHome => new 
File(userSparkHome)).getOrElse(sparkHome),
-            workDir, akkaUrl, conf, ExecutorState.RUNNING)
+            self, workerId, host, sparkHome, workDir, akkaUrl, conf, 
ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 48aaaa5..a28446f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -60,9 +60,8 @@ private[spark] class SparkDeploySchedulerBackend(
     val javaOpts = sparkJavaOpts ++ extraJavaOpts
     val command = 
Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
-    val sparkHome = sc.getSparkHome()
     val appDesc = new ApplicationDescription(sc.appName, maxCores, 
sc.executorMemory, command,
-      sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
+      sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala 
b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index de4bd90..e36902e 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -34,7 +34,7 @@ import scala.language.postfixOps
 class DriverSuite extends FunSuite with Timeouts {
 
   test("driver should exit after finishing") {
-    val sparkHome = 
sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+    val sparkHome = sys.props("spark.test.home")
     // Regression test for SPARK-530: "Spark driver process doesn't exit after 
finishing"
     val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
     forAll(masters) { (master: String) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 093394a..31aa7ec 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite {
 
   def createAppDesc(): ApplicationDescription = {
     val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), 
Seq(), Seq())
-    new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), 
"appUiUrl")
+    new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
   }
 
   def createAppInfo() : ApplicationInfo = {
@@ -169,8 +169,7 @@ object JsonConstants {
   val appDescJsonStr =
     """
       |{"name":"name","cores":4,"memoryperslave":1234,
-      |"user":"%s","sparkhome":"sparkHome",
-      |"command":"Command(mainClass,List(arg1, 
arg2),Map(),List(),List(),List())"}
+      |"user":"%s","command":"Command(mainClass,List(arg1, 
arg2),Map(),List(),List(),List())"}
     """.format(System.getProperty("user.name", "<unknown>")).stripMargin
 
   val executorRunnerJsonStr =

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 9190b05..8126ef1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
 
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use 
sparingly.
   def runSparkSubmit(args: Seq[String]): String = {
-    val sparkHome = 
sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+    val sparkHome = sys.props("spark.test.home")
     Utils.executeAndGetOutput(
       Seq("./bin/spark-submit") ++ args,
       new File(sparkHome),

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index ca4d987..149a2b3 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -27,12 +27,11 @@ import org.apache.spark.SparkConf
 class ExecutorRunnerTest extends FunSuite {
   test("command includes appId") {
     def f(s:String) = new File(s)
-    val sparkHome = 
sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
+    val sparkHome = sys.props("spark.test.home")
     val appDesc = new ApplicationDescription("app name", Some(8), 500,
-      Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
-      sparkHome, "appUiUrl")
+      Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
     val appId = "12345-worker321-9876"
-    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", 
"worker321", f(sparkHome.getOrElse(".")),
+    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", 
"worker321", f(sparkHome),
       f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
 
     assert(er.getCommandSeq.last === appId)

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a8bbd55..1d7cc6d 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -328,7 +328,7 @@ object TestSettings {
   lazy val settings = Seq (
     // Fork new JVMs for tests and set Java options for those
     fork := true,
-    javaOptions in Test += "-Dspark.home=" + sparkHome,
+    javaOptions in Test += "-Dspark.test.home=" + sparkHome,
     javaOptions in Test += "-Dspark.testing=1",
     javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
     javaOptions in Test ++= System.getProperties.filter(_._1 startsWith 
"spark")

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 7b0f8d8..2e80eb5 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -84,7 +84,7 @@ class SparkContext(object):
         @param serializer: The serializer for RDDs.
         @param conf: A L{SparkConf} object setting Spark properties.
         @param gateway: Use an existing gateway and JVM, otherwise a new JVM
-               will be instatiated.
+               will be instantiated.
 
 
         >>> from pyspark.context import SparkContext

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 42c7e51..65788f4 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -969,9 +969,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val 
out: JPrintWriter,
     if (execUri != null) {
       conf.set("spark.executor.uri", execUri)
     }
-    if (System.getenv("SPARK_HOME") != null) {
-      conf.setSparkHome(System.getenv("SPARK_HOME"))
-    }
     sparkContext = new SparkContext(conf)
     logInfo("Created spark context..")
     sparkContext

http://git-wip-us.apache.org/repos/asf/spark/blob/148af608/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index ac56ff7..b780282 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -35,7 +35,6 @@ class Checkpoint(@transient ssc: StreamingContext, val 
checkpointTime: Time)
   extends Logging with Serializable {
   val master = ssc.sc.master
   val framework = ssc.sc.appName
-  val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
   val jars = ssc.sc.jars
   val graph = ssc.graph
   val checkpointDir = ssc.checkpointDir


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to