Repository: spark Updated Branches: refs/heads/master 928d63162 -> 965245d08
[SPARK-9552] Add force control for killExecutors to avoid false killing for those busy executors By using the dynamic allocation, sometimes it occurs false killing for those busy executors. Some executors with assignments will be killed because of being idle for enough time (say 60 seconds). The root cause is that the Task-Launch listener event is asynchronized. For example, some executors are under assigning tasks, but not sending out the listener notification yet. Meanwhile, the dynamic allocation's executor idle time is up (e.g., 60 seconds). It will trigger killExecutor event at the same time. 1. the timer expiration starts before the listener event arrives. 2. Then, the task is going to run on top of that killed/killing executor. It will lead to task failure finally. Here is the proposal to fix it. We can add the force control for killExecutor. If the force control is not set (i.e., false), we'd better to check if the executor under killing is idle or busy. If the current executor has some assignment, we should not kill that executor and return back false (to indicate killing failure). In dynamic allocation, we'd better to turn off force killing (i.e., force = false), we will meet killing failure if tries to kill a busy executor. And then, the executor timer won't be invalid. Later on, the task assignment event arrives, we can remove the idle timer accordingly. So that we can avoid false killing for those busy executors in dynamic allocation. For the rest of usages, the end users can decide if to use force killing or not by themselves. If to turn on that option, the killExecutor will do the action without any status checking. Author: Grace <jie.hu...@intel.com> Author: Andrew Or <and...@databricks.com> Author: Jie Huang <jie.hu...@intel.com> Closes #7888 from GraceH/forcekill. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/965245d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/965245d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/965245d0 Branch: refs/heads/master Commit: 965245d087c18edc6c3d5baddeaf83163e32e330 Parents: 928d631 Author: Grace <jie.hu...@intel.com> Authored: Tue Nov 17 15:43:35 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Tue Nov 17 15:43:35 2015 -0800 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 1 + .../scala/org/apache/spark/SparkContext.scala | 4 +- .../spark/scheduler/TaskSchedulerImpl.scala | 27 +++++++--- .../cluster/CoarseGrainedSchedulerBackend.scala | 13 +++-- .../StandaloneDynamicAllocationSuite.scala | 52 +++++++++++++++++++- 5 files changed, 82 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e..6419218 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -509,6 +509,7 @@ private[spark] class ExecutorAllocationManager( private def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) + executorsPendingToRemove.remove(executorId) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4bbd0b0..b5645b0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1461,7 +1461,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli override def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds) + b.killExecutors(executorIds, replace = false, force = true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false @@ -1499,7 +1499,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(Seq(executorId), replace = true) + b.killExecutors(Seq(executorId), replace = true, force = true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5f13669..bf0419d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl( // Incrementing task IDs val nextTaskId = new AtomicLong(0) - // Which executor IDs we have executors on - val activeExecutorIds = new HashSet[String] + // Number of tasks running on each executor + private val executorIdToTaskCount = new HashMap[String, Int] // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host @@ -254,6 +254,7 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId + executorIdToTaskCount(execId) += 1 executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) @@ -282,7 +283,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host - activeExecutorIds += o.executorId + executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -331,7 +332,8 @@ private[spark] class TaskSchedulerImpl( if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (activeExecutorIds.contains(execId)) { + + if (executorIdToTaskCount.contains(execId)) { removeExecutor(execId, SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) failedExecutor = Some(execId) @@ -341,7 +343,11 @@ private[spark] class TaskSchedulerImpl( case Some(taskSet) => if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) - taskIdToExecutorId.remove(tid) + taskIdToExecutorId.remove(tid).foreach { execId => + if (executorIdToTaskCount.contains(execId)) { + executorIdToTaskCount(execId) -= 1 + } + } } if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) @@ -462,7 +468,7 @@ private[spark] class TaskSchedulerImpl( var failedExecutor: Option[String] = None synchronized { - if (activeExecutorIds.contains(executorId)) { + if (executorIdToTaskCount.contains(executorId)) { val hostPort = executorIdToHost(executorId) logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason)) removeExecutor(executorId, reason) @@ -498,7 +504,8 @@ private[spark] class TaskSchedulerImpl( * of any running tasks, since the loss reason defines whether we'll fail those tasks. */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { - activeExecutorIds -= executorId + executorIdToTaskCount -= executorId + val host = executorIdToHost(executorId) val execs = executorsByHost.getOrElse(host, new HashSet) execs -= executorId @@ -535,7 +542,11 @@ private[spark] class TaskSchedulerImpl( } def isExecutorAlive(execId: String): Boolean = synchronized { - activeExecutorIds.contains(execId) + executorIdToTaskCount.contains(execId) + } + + def isExecutorBusy(execId: String): Boolean = synchronized { + executorIdToTaskCount.getOrElse(execId, -1) > 0 } // By default, rack is unknown http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3373caf..6f0c910 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -453,7 +453,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. */ final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized { - killExecutors(executorIds, replace = false) + killExecutors(executorIds, replace = false, force = false) } /** @@ -461,9 +461,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * * @param executorIds identifiers of executors to kill * @param replace whether to replace the killed executors with new ones + * @param force whether to force kill busy executors * @return whether the kill request is acknowledged. */ - final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized { + final def killExecutors( + executorIds: Seq[String], + replace: Boolean, + force: Boolean): Boolean = synchronized { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) unknownExecutors.foreach { id => @@ -471,7 +475,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } // If an executor is already pending to be removed, do not kill it again (SPARK-9795) - val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) } + // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) + val executorsToKill = knownExecutors + .filter { id => !executorsPendingToRemove.contains(id) } + .filter { id => force || !scheduler.isExecutorBusy(id) } executorsPendingToRemove ++= executorsToKill // If we do not wish to replace the executors we kill, sync the target number of executors http://git-wip-us.apache.org/repos/asf/spark/blob/965245d0/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index d145e78..2fa795f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy +import scala.collection.mutable import scala.concurrent.duration._ import org.mockito.Mockito.{mock, when} -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ @@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor @@ -38,7 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE class StandaloneDynamicAllocationSuite extends SparkFunSuite with LocalSparkContext - with BeforeAndAfterAll { + with BeforeAndAfterAll + with PrivateMethodTester { private val numWorkers = 2 private val conf = new SparkConf() @@ -404,6 +407,41 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === 1) } + test("disable force kill for busy executors (SPARK-9552)") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } + var apps = getApplications() + // sync executors between the Master and the driver, needed because + // the driver refuses to kill executors it does not know about + syncExecutors(sc) + val executors = getExecutorIds(sc) + assert(executors.size === 2) + + // simulate running a task on the executor + val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount) + val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl] + val executorIdToTaskCount = taskScheduler invokePrivate getMap() + executorIdToTaskCount(executors.head) = 1 + // kill the busy executor without force; this should fail + assert(killExecutor(sc, executors.head, force = false)) + apps = getApplications() + assert(apps.head.executors.size === 2) + + // force kill busy executor + assert(killExecutor(sc, executors.head, force = true)) + apps = getApplications() + // kill executor successfully + assert(apps.head.executors.size === 1) + + } + // =============================== // | Utility methods for testing | // =============================== @@ -455,6 +493,16 @@ class StandaloneDynamicAllocationSuite sc.killExecutors(getExecutorIds(sc).take(n)) } + /** Kill the given executor, specifying whether to force kill it. */ + private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = { + syncExecutors(sc) + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutors(Seq(executorId), replace = false, force) + case _ => fail("expected coarse grained scheduler") + } + } + /** * Return a list of executor IDs belonging to this application. * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org