Repository: spark
Updated Branches:
  refs/heads/master 928d63162 -> 965245d08


[SPARK-9552] Add force control for killExecutors to avoid false killing for 
those busy executors

By using the dynamic allocation, sometimes it occurs false killing for those 
busy executors. Some executors with assignments will be killed because of being 
idle for enough time (say 60 seconds). The root cause is that the Task-Launch 
listener event is asynchronized.

For example, some executors are under assigning tasks, but not sending out the 
listener notification yet. Meanwhile, the dynamic allocation's executor idle 
time is up (e.g., 60 seconds). It will trigger killExecutor event at the same 
time.
 1. the timer expiration starts before the listener event arrives.
 2. Then, the task is going to run on top of that killed/killing executor. It 
will lead to task failure finally.

Here is the proposal to fix it. We can add the force control for killExecutor. 
If the force control is not set (i.e., false), we'd better to check if the 
executor under killing is idle or busy. If the current executor has some 
assignment, we should not kill that executor and return back false (to indicate 
killing failure). In dynamic allocation, we'd better to turn off force killing 
(i.e., force = false), we will meet killing failure if tries to kill a busy 
executor. And then, the executor timer won't be invalid. Later on, the task 
assignment event arrives, we can remove the idle timer accordingly. So that we 
can avoid false killing for those busy executors in dynamic allocation.

For the rest of usages, the end users can decide if to use force killing or not 
by themselves.  If to turn on that option, the killExecutor will do the action 
without any status checking.

Author: Grace <jie.hu...@intel.com>
Author: Andrew Or <and...@databricks.com>
Author: Jie Huang <jie.hu...@intel.com>

Closes #7888 from GraceH/forcekill.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/965245d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/965245d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/965245d0

Branch: refs/heads/master
Commit: 965245d087c18edc6c3d5baddeaf83163e32e330
Parents: 928d631
Author: Grace <jie.hu...@intel.com>
Authored: Tue Nov 17 15:43:35 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Nov 17 15:43:35 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |  1 +
 .../scala/org/apache/spark/SparkContext.scala   |  4 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     | 27 +++++++---
 .../cluster/CoarseGrainedSchedulerBackend.scala | 13 +++--
 .../StandaloneDynamicAllocationSuite.scala      | 52 +++++++++++++++++++-
 5 files changed, 82 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b93536e..6419218 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -509,6 +509,7 @@ private[spark] class ExecutorAllocationManager(
   private def onExecutorBusy(executorId: String): Unit = synchronized {
     logDebug(s"Clearing idle timer for $executorId because it is now running a 
task")
     removeTimes.remove(executorId)
+    executorsPendingToRemove.remove(executorId)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4bbd0b0..b5645b0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1461,7 +1461,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   override def killExecutors(executorIds: Seq[String]): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutors(executorIds)
+        b.killExecutors(executorIds, replace = false, force = true)
       case _ =>
         logWarning("Killing executors is only supported in coarse-grained 
mode")
         false
@@ -1499,7 +1499,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutors(Seq(executorId), replace = true)
+        b.killExecutors(Seq(executorId), replace = true, force = true)
       case _ =>
         logWarning("Killing executors is only supported in coarse-grained 
mode")
         false

http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5f13669..bf0419d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl(
   // Incrementing task IDs
   val nextTaskId = new AtomicLong(0)
 
-  // Which executor IDs we have executors on
-  val activeExecutorIds = new HashSet[String]
+  // Number of tasks running on each executor
+  private val executorIdToTaskCount = new HashMap[String, Int]
 
   // The set of executors we have on each host; this is used to compute 
hostsAlive, which
   // in turn is used to decide when we can attain data locality on a given host
@@ -254,6 +254,7 @@ private[spark] class TaskSchedulerImpl(
             val tid = task.taskId
             taskIdToTaskSetManager(tid) = taskSet
             taskIdToExecutorId(tid) = execId
+            executorIdToTaskCount(execId) += 1
             executorsByHost(host) += execId
             availableCpus(i) -= CPUS_PER_TASK
             assert(availableCpus(i) >= 0)
@@ -282,7 +283,7 @@ private[spark] class TaskSchedulerImpl(
     var newExecAvail = false
     for (o <- offers) {
       executorIdToHost(o.executorId) = o.host
-      activeExecutorIds += o.executorId
+      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
       if (!executorsByHost.contains(o.host)) {
         executorsByHost(o.host) = new HashSet[String]()
         executorAdded(o.executorId, o.host)
@@ -331,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
         if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
           // We lost this entire executor, so remember that it's gone
           val execId = taskIdToExecutorId(tid)
-          if (activeExecutorIds.contains(execId)) {
+
+          if (executorIdToTaskCount.contains(execId)) {
             removeExecutor(execId,
               SlaveLost(s"Task $tid was lost, so marking the executor as lost 
as well."))
             failedExecutor = Some(execId)
@@ -341,7 +343,11 @@ private[spark] class TaskSchedulerImpl(
           case Some(taskSet) =>
             if (TaskState.isFinished(state)) {
               taskIdToTaskSetManager.remove(tid)
-              taskIdToExecutorId.remove(tid)
+              taskIdToExecutorId.remove(tid).foreach { execId =>
+                if (executorIdToTaskCount.contains(execId)) {
+                  executorIdToTaskCount(execId) -= 1
+                }
+              }
             }
             if (state == TaskState.FINISHED) {
               taskSet.removeRunningTask(tid)
@@ -462,7 +468,7 @@ private[spark] class TaskSchedulerImpl(
     var failedExecutor: Option[String] = None
 
     synchronized {
-      if (activeExecutorIds.contains(executorId)) {
+      if (executorIdToTaskCount.contains(executorId)) {
         val hostPort = executorIdToHost(executorId)
         logError("Lost executor %s on %s: %s".format(executorId, hostPort, 
reason))
         removeExecutor(executorId, reason)
@@ -498,7 +504,8 @@ private[spark] class TaskSchedulerImpl(
    * of any running tasks, since the loss reason defines whether we'll fail 
those tasks.
    */
   private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
-    activeExecutorIds -= executorId
+    executorIdToTaskCount -= executorId
+
     val host = executorIdToHost(executorId)
     val execs = executorsByHost.getOrElse(host, new HashSet)
     execs -= executorId
@@ -535,7 +542,11 @@ private[spark] class TaskSchedulerImpl(
   }
 
   def isExecutorAlive(execId: String): Boolean = synchronized {
-    activeExecutorIds.contains(execId)
+    executorIdToTaskCount.contains(execId)
+  }
+
+  def isExecutorBusy(execId: String): Boolean = synchronized {
+    executorIdToTaskCount.getOrElse(execId, -1) > 0
   }
 
   // By default, rack is unknown

http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 3373caf..6f0c910 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -453,7 +453,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    * @return whether the kill request is acknowledged.
    */
   final override def killExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
-    killExecutors(executorIds, replace = false)
+    killExecutors(executorIds, replace = false, force = false)
   }
 
   /**
@@ -461,9 +461,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    *
    * @param executorIds identifiers of executors to kill
    * @param replace whether to replace the killed executors with new ones
+   * @param force whether to force kill busy executors
    * @return whether the kill request is acknowledged.
    */
-  final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean 
= synchronized {
+  final def killExecutors(
+      executorIds: Seq[String],
+      replace: Boolean,
+      force: Boolean): Boolean = synchronized {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
     val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
     unknownExecutors.foreach { id =>
@@ -471,7 +475,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     // If an executor is already pending to be removed, do not kill it again 
(SPARK-9795)
-    val executorsToKill = knownExecutors.filter { id => 
!executorsPendingToRemove.contains(id) }
+    // If this executor is busy, do not kill it unless we are told to force 
kill it (SPARK-9552)
+    val executorsToKill = knownExecutors
+      .filter { id => !executorsPendingToRemove.contains(id) }
+      .filter { id => force || !scheduler.isExecutorBusy(id) }
     executorsPendingToRemove ++= executorsToKill
 
     // If we do not wish to replace the executors we kill, sync the target 
number of executors

http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index d145e78..2fa795f 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.deploy
 
+import scala.collection.mutable
 import scala.concurrent.duration._
 
 import org.mockito.Mockito.{mock, when}
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
@@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster._
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
 
@@ -38,7 +40,8 @@ import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE
 class StandaloneDynamicAllocationSuite
   extends SparkFunSuite
   with LocalSparkContext
-  with BeforeAndAfterAll {
+  with BeforeAndAfterAll
+  with PrivateMethodTester {
 
   private val numWorkers = 2
   private val conf = new SparkConf()
@@ -404,6 +407,41 @@ class StandaloneDynamicAllocationSuite
     assert(apps.head.getExecutorLimit === 1)
   }
 
+  test("disable force kill for busy executors (SPARK-9552)") {
+    sc = new SparkContext(appConf)
+    val appId = sc.applicationId
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
+    var apps = getApplications()
+    // sync executors between the Master and the driver, needed because
+    // the driver refuses to kill executors it does not know about
+    syncExecutors(sc)
+    val executors = getExecutorIds(sc)
+    assert(executors.size === 2)
+
+    // simulate running a task on the executor
+    val getMap = PrivateMethod[mutable.HashMap[String, 
Int]]('executorIdToTaskCount)
+    val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
+    val executorIdToTaskCount = taskScheduler invokePrivate getMap()
+    executorIdToTaskCount(executors.head) = 1
+    // kill the busy executor without force; this should fail
+    assert(killExecutor(sc, executors.head, force = false))
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+
+    // force kill busy executor
+    assert(killExecutor(sc, executors.head, force = true))
+    apps = getApplications()
+    // kill executor successfully
+    assert(apps.head.executors.size === 1)
+
+  }
+
   // ===============================
   // | Utility methods for testing |
   // ===============================
@@ -455,6 +493,16 @@ class StandaloneDynamicAllocationSuite
     sc.killExecutors(getExecutorIds(sc).take(n))
   }
 
+  /** Kill the given executor, specifying whether to force kill it. */
+  private def killExecutor(sc: SparkContext, executorId: String, force: 
Boolean): Boolean = {
+    syncExecutors(sc)
+    sc.schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        b.killExecutors(Seq(executorId), replace = false, force)
+      case _ => fail("expected coarse grained scheduler")
+    }
+  }
+
   /**
    * Return a list of executor IDs belonging to this application.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to