Repository: spark Updated Branches: refs/heads/branch-1.2 075b399c5 -> ca37639aa
[SPARK-4754] Refactor SparkContext into ExecutorAllocationClient This is such that the `ExecutorAllocationManager` does not take in the `SparkContext` with all of its dependencies as an argument. This prevents future developers of this class to tie down this class further with the `SparkContext`, which has really become quite a monstrous object. cc'ing pwendell who originally suggested this, and JoshRosen who may have thoughts about the trait mix-in style of `SparkContext`. Author: Andrew Or <and...@databricks.com> Closes #3614 from andrewor14/dynamic-allocation-sc and squashes the following commits: 187070d [Andrew Or] Merge branch 'master' of github.com:apache/spark into dynamic-allocation-sc 59baf6c [Andrew Or] Merge branch 'master' of github.com:apache/spark into dynamic-allocation-sc 347a348 [Andrew Or] Refactor SparkContext into ExecutorAllocationClient (cherry picked from commit 9804a759b68f56eceb8a2f4ea90f76a92b5f9f67) Signed-off-by: Andrew Or <and...@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca37639a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca37639a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca37639a Branch: refs/heads/branch-1.2 Commit: ca37639aa1b537d0f9b56bf1362bf293635e235c Parents: 075b399 Author: Andrew Or <and...@databricks.com> Authored: Thu Dec 18 17:37:42 2014 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Thu Dec 18 17:42:02 2014 -0800 ---------------------------------------------------------------------- .../apache/spark/ExecutorAllocationClient.scala | 42 ++++++++++++++++++++ .../spark/ExecutorAllocationManager.scala | 14 ++++--- .../scala/org/apache/spark/SparkContext.scala | 10 ++--- .../cluster/CoarseGrainedSchedulerBackend.scala | 8 ++-- 4 files changed, 59 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ca37639a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala new file mode 100644 index 0000000..a46a81e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * A client that communicates with the cluster manager to request or kill executors. + */ +private[spark] trait ExecutorAllocationClient { + + /** + * Request an additional number of executors from the cluster manager. + * Return whether the request is acknowledged by the cluster manager. + */ + def requestExecutors(numAdditionalExecutors: Int): Boolean + + /** + * Request that the cluster manager kill the specified executors. + * Return whether the request is acknowledged by the cluster manager. + */ + def killExecutors(executorIds: Seq[String]): Boolean + + /** + * Request that the cluster manager kill the specified executor. + * Return whether the request is acknowledged by the cluster manager. + */ + def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) +} http://git-wip-us.apache.org/repos/asf/spark/blob/ca37639a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 88adb89..e9e90e3 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -60,11 +60,13 @@ import org.apache.spark.scheduler._ * spark.dynamicAllocation.executorIdleTimeout (K) - * If an executor has been idle for this duration, remove it */ -private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging { +private[spark] class ExecutorAllocationManager( + client: ExecutorAllocationClient, + listenerBus: LiveListenerBus, + conf: SparkConf) + extends Logging { import ExecutorAllocationManager._ - private val conf = sc.conf - // Lower and upper bounds on the number of executors. These are required. private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1) private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1) @@ -168,7 +170,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging * Register for scheduler callbacks to decide when to add and remove executors. */ def start(): Unit = { - sc.addSparkListener(listener) + listenerBus.addListener(listener) startPolling() } @@ -253,7 +255,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd - val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd) + val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) if (addRequestAcknowledged) { logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " + s"tasks are backlogged (new desired total will be $newTotalExecutors)") @@ -295,7 +297,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging } // Send a request to the backend to kill this executor - val removeRequestAcknowledged = testing || sc.killExecutor(executorId) + val removeRequestAcknowledged = testing || client.killExecutor(executorId) if (removeRequestAcknowledged) { logInfo(s"Removing executor $executorId because it has been idle for " + s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})") http://git-wip-us.apache.org/repos/asf/spark/blob/ca37639a/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 32191c6..9d3d1e1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -64,7 +64,7 @@ import org.apache.spark.util._ * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) extends Logging { +class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() @@ -361,7 +361,7 @@ class SparkContext(config: SparkConf) extends Logging { // Optionally scale number of executors dynamically based on workload. Exposed for testing. private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { - Some(new ExecutorAllocationManager(this)) + Some(new ExecutorAllocationManager(this, listenerBus, conf)) } else { None } @@ -990,7 +990,7 @@ class SparkContext(config: SparkConf) extends Logging { * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - def requestExecutors(numAdditionalExecutors: Int): Boolean = { + override def requestExecutors(numAdditionalExecutors: Int): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1006,7 +1006,7 @@ class SparkContext(config: SparkConf) extends Logging { * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - def killExecutors(executorIds: Seq[String]): Boolean = { + override def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) @@ -1022,7 +1022,7 @@ class SparkContext(config: SparkConf) extends Logging { * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) + override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId) /** The version of Spark on which this application is running. */ def version = SPARK_VERSION http://git-wip-us.apache.org/repos/asf/spark/blob/ca37639a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 29cd344..fe9914b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,7 +27,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} +import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils} @@ -42,7 +42,7 @@ import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Ut */ private[spark] class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem) - extends SchedulerBackend with Logging + extends ExecutorAllocationClient with SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) @@ -307,7 +307,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste * Request an additional number of executors from the cluster manager. * Return whether the request is acknowledged. */ - final def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { + final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") logDebug(s"Number of pending executors is now $numPendingExecutors") numPendingExecutors += numAdditionalExecutors @@ -334,7 +334,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste * Request that the cluster manager kill the specified executors. * Return whether the kill request is acknowledged. */ - final def killExecutors(executorIds: Seq[String]): Boolean = synchronized { + final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") val filteredExecutorIds = new ArrayBuffer[String] executorIds.foreach { id => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org