Repository: spark Updated Branches: refs/heads/master 8e253ebbf -> 336cd341e
Small refactoring to pass SparkEnv into Executor rather than creating SparkEnv in Executor. This consolidates some code path and makes constructor arguments simpler for a few classes. Author: Reynold Xin <r...@databricks.com> Closes #3738 from rxin/sparkEnvDepRefactor and squashes the following commits: 82e02cc [Reynold Xin] Fixed couple bugs. 217062a [Reynold Xin] Code review feedback. bd00af7 [Reynold Xin] Small refactoring to pass SparkEnv into Executor rather than creating SparkEnv in Executor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/336cd341 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/336cd341 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/336cd341 Branch: refs/heads/master Commit: 336cd341ee449098a1db594592a44f5ab9200fa0 Parents: 8e253eb Author: Reynold Xin <r...@databricks.com> Authored: Fri Dec 19 12:51:12 2014 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Dec 19 12:51:12 2014 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkEnv.scala | 26 ++++++-------- .../executor/CoarseGrainedExecutorBackend.scala | 32 +++++++++-------- .../org/apache/spark/executor/Executor.scala | 36 ++++++-------------- .../spark/executor/MesosExecutorBackend.scala | 10 ++++-- .../spark/scheduler/local/LocalBackend.scala | 11 +++--- 5 files changed, 52 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/336cd341/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f4215f2..6656df4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -177,18 +177,18 @@ object SparkEnv extends Logging { hostname: String, port: Int, numCores: Int, - isLocal: Boolean, - actorSystem: ActorSystem = null): SparkEnv = { - create( + isLocal: Boolean): SparkEnv = { + val env = create( conf, executorId, hostname, port, isDriver = false, isLocal = isLocal, - defaultActorSystem = actorSystem, numUsableCores = numCores ) + SparkEnv.set(env) + env } /** @@ -202,7 +202,6 @@ object SparkEnv extends Logging { isDriver: Boolean, isLocal: Boolean, listenerBus: LiveListenerBus = null, - defaultActorSystem: ActorSystem = null, numUsableCores: Int = 0): SparkEnv = { // Listener bus is only used on the driver @@ -212,20 +211,17 @@ object SparkEnv extends Logging { val securityManager = new SecurityManager(conf) - // If an existing actor system is already provided, use it. - // This is the case when an executor is launched in coarse-grained mode. - val (actorSystem, boundPort) = - Option(defaultActorSystem) match { - case Some(as) => (as, port) - case None => - val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName - AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager) - } + // Create the ActorSystem for Akka and get the port it binds to. + val (actorSystem, boundPort) = { + val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName + AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager) + } // Figure out which port Akka actually bound to in case the original port is 0 or occupied. - // This is so that we tell the executors the correct port to connect to. if (isDriver) { conf.set("spark.driver.port", boundPort.toString) + } else { + conf.set("spark.executor.port", boundPort.toString) } // Create an instance of the class with the given name, possibly initializing it with our conf http://git-wip-us.apache.org/repos/asf/spark/blob/336cd341/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 5f46f3b..c794a7b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import scala.concurrent.Await -import akka.actor.{Actor, ActorSelection, ActorSystem, Props} +import akka.actor.{Actor, ActorSelection, Props} import akka.pattern.Patterns import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent} @@ -38,8 +38,7 @@ private[spark] class CoarseGrainedExecutorBackend( executorId: String, hostPort: String, cores: Int, - sparkProperties: Seq[(String, String)], - actorSystem: ActorSystem) + env: SparkEnv) extends Actor with ActorLogReceive with ExecutorBackend with Logging { Utils.checkHostPort(hostPort, "Expected hostport") @@ -58,8 +57,7 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") val (hostname, _) = Utils.parseHostPort(hostPort) - executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false, - actorSystem) + executor = new Executor(executorId, hostname, env, isLocal = false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -70,7 +68,7 @@ private[spark] class CoarseGrainedExecutorBackend( logError("Received LaunchTask command but executor was null") System.exit(1) } else { - val ser = SparkEnv.get.closureSerializer.newInstance() + val ser = env.closureSerializer.newInstance() val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask) @@ -128,21 +126,25 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() - // Create a new ActorSystem using driver's Spark properties to run the backend. + // Create SparkEnv using properties we fetched from the driver. val driverConf = new SparkConf().setAll(props) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem( - SparkEnv.executorActorSystemName, - hostname, port, driverConf, new SecurityManager(driverConf)) - // set it + val env = SparkEnv.createExecutorEnv( + driverConf, executorId, hostname, port, cores, isLocal = false) + + // SparkEnv sets spark.driver.port so it shouldn't be 0 anymore. + val boundPort = env.conf.getInt("spark.executor.port", 0) + assert(boundPort != 0) + + // Start the CoarseGrainedExecutorBackend actor. val sparkHostPort = hostname + ":" + boundPort - actorSystem.actorOf( + env.actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], - driverUrl, executorId, sparkHostPort, cores, props, actorSystem), + driverUrl, executorId, sparkHostPort, cores, env), name = "Executor") workerUrl.foreach { url => - actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") + env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") } - actorSystem.awaitTermination() + env.actorSystem.awaitTermination() } } http://git-wip-us.apache.org/repos/asf/spark/blob/336cd341/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index da030f2..0f99cd9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal -import akka.actor.{Props, ActorSystem} +import akka.actor.Props import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -42,10 +42,8 @@ import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils} private[spark] class Executor( executorId: String, slaveHostname: String, - properties: Seq[(String, String)], - numCores: Int, - isLocal: Boolean = false, - actorSystem: ActorSystem = null) + env: SparkEnv, + isLocal: Boolean = false) extends Logging { // Application dependencies (added through SparkContext) that we've fetched so far on this node. @@ -55,6 +53,8 @@ private[spark] class Executor( private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) + private val conf = env.conf + @volatile private var isStopped = false // No ip or host:port - just hostname @@ -65,10 +65,6 @@ private[spark] class Executor( // Make sure the local hostname we report matches the cluster scheduler's name for this host Utils.setCustomHostname(slaveHostname) - // Set spark.* properties from executor arg - val conf = new SparkConf(true) - conf.setAll(properties) - if (!isLocal) { // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire @@ -77,21 +73,11 @@ private[spark] class Executor( } val executorSource = new ExecutorSource(this, executorId) - - // Initialize Spark environment (using system properties read above) conf.set("spark.executor.id", executorId) - private val env = { - if (!isLocal) { - val port = conf.getInt("spark.executor.port", 0) - val _env = SparkEnv.createExecutorEnv( - conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem) - SparkEnv.set(_env) - _env.metricsSystem.registerSource(executorSource) - _env.blockManager.initialize(conf.getAppId) - _env - } else { - SparkEnv.get - } + + if (!isLocal) { + env.metricsSystem.registerSource(executorSource) + env.blockManager.initialize(conf.getAppId) } // Create an actor for receiving RPCs from the driver @@ -167,7 +153,7 @@ private[spark] class Executor( override def run() { val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) - val ser = SparkEnv.get.closureSerializer.newInstance() + val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 @@ -202,7 +188,7 @@ private[spark] class Executor( throw new TaskKilledException } - val resultSer = SparkEnv.get.serializer.newInstance() + val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/spark/blob/336cd341/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index f15e6bc..a098d07 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -25,7 +25,7 @@ import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.spark.{Logging, TaskState} +import org.apache.spark.{Logging, TaskState, SparkConf, SparkEnv} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.{SignalLogger, Utils} @@ -64,11 +64,15 @@ private[spark] class MesosExecutorBackend this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) + val conf = new SparkConf(loadDefaults = true).setAll(properties) + val port = conf.getInt("spark.executor.port", 0) + val env = SparkEnv.createExecutorEnv( + conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) + executor = new Executor( executorId, slaveInfo.getHostname, - properties, - cpusPerTask) + env) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { http://git-wip-us.apache.org/repos/asf/spark/blob/336cd341/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index a2f1f14..b3bd311 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -41,17 +41,18 @@ private case class StopExecutor() * and the TaskSchedulerImpl. */ private[spark] class LocalActor( - scheduler: TaskSchedulerImpl, - executorBackend: LocalBackend, - private val totalCores: Int) extends Actor with ActorLogReceive with Logging { + scheduler: TaskSchedulerImpl, + executorBackend: LocalBackend, + private val totalCores: Int) + extends Actor with ActorLogReceive with Logging { private var freeCores = totalCores private val localExecutorId = SparkContext.DRIVER_IDENTIFIER private val localExecutorHostname = "localhost" - val executor = new Executor( - localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true) + private val executor = new Executor( + localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true) override def receiveWithLogging = { case ReviveOffers => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org