Repository: spark
Updated Branches:
  refs/heads/branch-1.2 075b399c5 -> ca37639aa


[SPARK-4754] Refactor SparkContext into ExecutorAllocationClient

This is such that the `ExecutorAllocationManager` does not take in the 
`SparkContext` with all of its dependencies as an argument. This prevents 
future developers of this class to tie down this class further with the 
`SparkContext`, which has really become quite a monstrous object.

cc'ing pwendell who originally suggested this, and JoshRosen who may have 
thoughts about the trait mix-in style of `SparkContext`.

Author: Andrew Or <and...@databricks.com>

Closes #3614 from andrewor14/dynamic-allocation-sc and squashes the following 
commits:

187070d [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
dynamic-allocation-sc
59baf6c [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
dynamic-allocation-sc
347a348 [Andrew Or] Refactor SparkContext into ExecutorAllocationClient

(cherry picked from commit 9804a759b68f56eceb8a2f4ea90f76a92b5f9f67)
Signed-off-by: Andrew Or <and...@databricks.com>

Conflicts:
        core/src/main/scala/org/apache/spark/SparkContext.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca37639a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca37639a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca37639a

Branch: refs/heads/branch-1.2
Commit: ca37639aa1b537d0f9b56bf1362bf293635e235c
Parents: 075b399
Author: Andrew Or <and...@databricks.com>
Authored: Thu Dec 18 17:37:42 2014 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Dec 18 17:42:02 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/ExecutorAllocationClient.scala | 42 ++++++++++++++++++++
 .../spark/ExecutorAllocationManager.scala       | 14 ++++---
 .../scala/org/apache/spark/SparkContext.scala   | 10 ++---
 .../cluster/CoarseGrainedSchedulerBackend.scala |  8 ++--
 4 files changed, 59 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca37639a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
new file mode 100644
index 0000000..a46a81e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * A client that communicates with the cluster manager to request or kill 
executors.
+ */
+private[spark] trait ExecutorAllocationClient {
+
+  /**
+   * Request an additional number of executors from the cluster manager.
+   * Return whether the request is acknowledged by the cluster manager.
+   */
+  def requestExecutors(numAdditionalExecutors: Int): Boolean
+
+  /**
+   * Request that the cluster manager kill the specified executors.
+   * Return whether the request is acknowledged by the cluster manager.
+   */
+  def killExecutors(executorIds: Seq[String]): Boolean
+
+  /**
+   * Request that the cluster manager kill the specified executor.
+   * Return whether the request is acknowledged by the cluster manager.
+   */
+  def killExecutor(executorId: String): Boolean = 
killExecutors(Seq(executorId))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca37639a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 88adb89..e9e90e3 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -60,11 +60,13 @@ import org.apache.spark.scheduler._
  *   spark.dynamicAllocation.executorIdleTimeout (K) -
  *     If an executor has been idle for this duration, remove it
  */
-private[spark] class ExecutorAllocationManager(sc: SparkContext) extends 
Logging {
+private[spark] class ExecutorAllocationManager(
+    client: ExecutorAllocationClient,
+    listenerBus: LiveListenerBus,
+    conf: SparkConf)
+  extends Logging {
   import ExecutorAllocationManager._
 
-  private val conf = sc.conf
-
   // Lower and upper bounds on the number of executors. These are required.
   private val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", -1)
   private val maxNumExecutors = 
conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
@@ -168,7 +170,7 @@ private[spark] class ExecutorAllocationManager(sc: 
SparkContext) extends Logging
    * Register for scheduler callbacks to decide when to add and remove 
executors.
    */
   def start(): Unit = {
-    sc.addSparkListener(listener)
+    listenerBus.addListener(listener)
     startPolling()
   }
 
@@ -253,7 +255,7 @@ private[spark] class ExecutorAllocationManager(sc: 
SparkContext) extends Logging
     val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
 
     val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-    val addRequestAcknowledged = testing || 
sc.requestExecutors(actualNumExecutorsToAdd)
+    val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
     if (addRequestAcknowledged) {
       logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
         s"tasks are backlogged (new desired total will be $newTotalExecutors)")
@@ -295,7 +297,7 @@ private[spark] class ExecutorAllocationManager(sc: 
SparkContext) extends Logging
     }
 
     // Send a request to the backend to kill this executor
-    val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
+    val removeRequestAcknowledged = testing || client.killExecutor(executorId)
     if (removeRequestAcknowledged) {
       logInfo(s"Removing executor $executorId because it has been idle for " +
         s"$executorIdleTimeout seconds (new desired total will be 
${numExistingExecutors - 1})")

http://git-wip-us.apache.org/repos/asf/spark/blob/ca37639a/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 32191c6..9d3d1e1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -64,7 +64,7 @@ import org.apache.spark.util._
  * @param config a Spark Config object describing the application 
configuration. Any settings in
  *   this config overrides the default configs as well as system properties.
  */
-class SparkContext(config: SparkConf) extends Logging {
+class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationClient {
 
   // The call site where this SparkContext was constructed.
   private val creationSite: CallSite = Utils.getCallSite()
@@ -361,7 +361,7 @@ class SparkContext(config: SparkConf) extends Logging {
   // Optionally scale number of executors dynamically based on workload. 
Exposed for testing.
   private[spark] val executorAllocationManager: 
Option[ExecutorAllocationManager] =
     if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
-      Some(new ExecutorAllocationManager(this))
+      Some(new ExecutorAllocationManager(this, listenerBus, conf))
     } else {
       None
     }
@@ -990,7 +990,7 @@ class SparkContext(config: SparkConf) extends Logging {
    * This is currently only supported in Yarn mode. Return whether the request 
is received.
    */
   @DeveloperApi
-  def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+  override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.requestExecutors(numAdditionalExecutors)
@@ -1006,7 +1006,7 @@ class SparkContext(config: SparkConf) extends Logging {
    * This is currently only supported in Yarn mode. Return whether the request 
is received.
    */
   @DeveloperApi
-  def killExecutors(executorIds: Seq[String]): Boolean = {
+  override def killExecutors(executorIds: Seq[String]): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.killExecutors(executorIds)
@@ -1022,7 +1022,7 @@ class SparkContext(config: SparkConf) extends Logging {
    * This is currently only supported in Yarn mode. Return whether the request 
is received.
    */
   @DeveloperApi
-  def killExecutor(executorId: String): Boolean = 
killExecutors(Seq(executorId))
+  override def killExecutor(executorId: String): Boolean = 
super.killExecutor(executorId)
 
   /** The version of Spark on which this application is running. */
   def version = SPARK_VERSION

http://git-wip-us.apache.org/repos/asf/spark/blob/ca37639a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 29cd344..fe9914b 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,7 +27,7 @@ import akka.actor._
 import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
-import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
+import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, 
SparkException, TaskState}
 import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, 
TaskDescription, TaskSchedulerImpl, WorkerOffer}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, 
Utils}
@@ -42,7 +42,7 @@ import org.apache.spark.util.{ActorLogReceive, 
SerializableBuffer, AkkaUtils, Ut
  */
 private[spark]
 class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val 
actorSystem: ActorSystem)
-  extends SchedulerBackend with Logging
+  extends ExecutorAllocationClient with SchedulerBackend with Logging
 {
   // Use an atomic variable to track total number of cores in the cluster for 
simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
@@ -307,7 +307,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val actorSyste
    * Request an additional number of executors from the cluster manager.
    * Return whether the request is acknowledged.
    */
-  final def requestExecutors(numAdditionalExecutors: Int): Boolean = 
synchronized {
+  final override def requestExecutors(numAdditionalExecutors: Int): Boolean = 
synchronized {
     logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from 
the cluster manager")
     logDebug(s"Number of pending executors is now $numPendingExecutors")
     numPendingExecutors += numAdditionalExecutors
@@ -334,7 +334,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val actorSyste
    * Request that the cluster manager kill the specified executors.
    * Return whether the kill request is acknowledged.
    */
-  final def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
+  final override def killExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
     val filteredExecutorIds = new ArrayBuffer[String]
     executorIds.foreach { id =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to