Repository: spark Updated Branches: refs/heads/master 098f83c7c -> b563987e8
[SPARK-4013] Do not create multiple actor systems on each executor In the existing code, each coarse-grained executor has two concurrently running actor systems. This causes many more error messages to be logged than necessary when the executor is lost or killed because we receive a disassociation event for each of these actor systems. This is blocking #2840. Author: Andrew Or <andrewo...@gmail.com> Closes #2863 from andrewor14/executor-actor-system and squashes the following commits: 44ce2e0 [Andrew Or] Avoid starting two actor systems on each executor Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b563987e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b563987e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b563987e Branch: refs/heads/master Commit: b563987e8dffc2aed1a834d555589a41cfb2a706 Parents: 098f83c Author: Andrew Or <andrewo...@gmail.com> Authored: Fri Oct 24 13:32:23 2014 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Fri Oct 24 13:32:23 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 12 ++--- .../main/scala/org/apache/spark/SparkEnv.scala | 49 ++++++++++++++++++-- .../executor/CoarseGrainedExecutorBackend.scala | 11 +++-- .../org/apache/spark/executor/Executor.scala | 11 +++-- 4 files changed, 61 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b563987e/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 55602a9..4565832 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -209,16 +209,10 @@ class SparkContext(config: SparkConf) extends Logging { // An asynchronous listener bus for Spark events private[spark] val listenerBus = new LiveListenerBus - // Create the Spark execution environment (cache, map output tracker, etc) conf.set("spark.executor.id", "driver") - private[spark] val env = SparkEnv.create( - conf, - "<driver>", - conf.get("spark.driver.host"), - conf.get("spark.driver.port").toInt, - isDriver = true, - isLocal = isLocal, - listenerBus = listenerBus) + + // Create the Spark execution environment (cache, map output tracker, etc) + private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus) SparkEnv.set(env) // Used to store a URL for each static file/jar together with the file's local timestamp http://git-wip-us.apache.org/repos/asf/spark/blob/b563987e/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 906a00b..5c076e5 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -144,14 +144,46 @@ object SparkEnv extends Logging { env } - private[spark] def create( + /** + * Create a SparkEnv for the driver. + */ + private[spark] def createDriverEnv( + conf: SparkConf, + isLocal: Boolean, + listenerBus: LiveListenerBus): SparkEnv = { + assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!") + assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") + val hostname = conf.get("spark.driver.host") + val port = conf.get("spark.driver.port").toInt + create(conf, "<driver>", hostname, port, true, isLocal, listenerBus) + } + + /** + * Create a SparkEnv for an executor. + * In coarse-grained mode, the executor provides an actor system that is already instantiated. + */ + private[spark] def createExecutorEnv( + conf: SparkConf, + executorId: String, + hostname: String, + port: Int, + isLocal: Boolean, + actorSystem: ActorSystem = null): SparkEnv = { + create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem) + } + + /** + * Helper method to create a SparkEnv for a driver or an executor. + */ + private def create( conf: SparkConf, executorId: String, hostname: String, port: Int, isDriver: Boolean, isLocal: Boolean, - listenerBus: LiveListenerBus = null): SparkEnv = { + listenerBus: LiveListenerBus = null, + defaultActorSystem: ActorSystem = null): SparkEnv = { // Listener bus is only used on the driver if (isDriver) { @@ -159,9 +191,16 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) - val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName - val (actorSystem, boundPort) = AkkaUtils.createActorSystem( - actorSystemName, hostname, port, conf, securityManager) + + // If an existing actor system is already provided, use it. + // This is the case when an executor is launched in coarse-grained mode. + val (actorSystem, boundPort) = + Option(defaultActorSystem) match { + case Some(as) => (as, port) + case None => + val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName + AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager) + } // Figure out which port Akka actually bound to in case the original port is 0 or occupied. // This is so that we tell the executors the correct port to connect to. http://git-wip-us.apache.org/repos/asf/spark/blob/b563987e/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c40a3e1..697154d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import scala.concurrent.Await -import akka.actor.{Actor, ActorSelection, Props} +import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import akka.pattern.Patterns import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent} @@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend( executorId: String, hostPort: String, cores: Int, - sparkProperties: Seq[(String, String)]) + sparkProperties: Seq[(String, String)], + actorSystem: ActorSystem) extends Actor with ActorLogReceive with ExecutorBackend with Logging { Utils.checkHostPort(hostPort, "Expected hostport") @@ -57,8 +58,8 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? - executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, - false) + val (hostname, _) = Utils.parseHostPort(hostPort) + executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -135,7 +136,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], - driverUrl, executorId, sparkHostPort, cores, props), + driverUrl, executorId, sparkHostPort, cores, props, actorSystem), name = "Executor") workerUrl.foreach { url => actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") http://git-wip-us.apache.org/repos/asf/spark/blob/b563987e/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0b75b9b..70a46c7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,6 +26,8 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal +import akka.actor.ActorSystem + import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ @@ -35,12 +37,14 @@ import org.apache.spark.util.{AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. + * In coarse-grained mode, an existing actor system is provided. */ private[spark] class Executor( executorId: String, slaveHostname: String, properties: Seq[(String, String)], - isLocal: Boolean = false) + isLocal: Boolean = false, + actorSystem: ActorSystem = null) extends Logging { // Application dependencies (added through SparkContext) that we've fetched so far on this node. @@ -77,8 +81,9 @@ private[spark] class Executor( conf.set("spark.executor.id", "executor." + executorId) private val env = { if (!isLocal) { - val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, - isDriver = false, isLocal = false) + val port = conf.getInt("spark.executor.port", 0) + val _env = SparkEnv.createExecutorEnv( + conf, executorId, slaveHostname, port, isLocal, actorSystem) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) _env --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org