Repository: spark Updated Branches: refs/heads/branch-1.5 0579f28df -> bc4ac65d4
[SPARK-9795] Dynamic allocation: avoid double counting when killing same executor twice This is based on KaiXinXiaoLei's changes in #7716. The issue is that when someone calls `sc.killExecutor("1")` on the same executor twice quickly, then the executor target will be adjusted downwards by 2 instead of 1 even though we're only actually killing one executor. In certain cases where we don't adjust the target back upwards quickly, we'll end up with jobs hanging. This is a common danger because there are many places where this is called: - `HeartbeatReceiver` kills an executor that has not been sending heartbeats - `ExecutorAllocationManager` kills an executor that has been idle - The user code might call this, which may interfere with the previous callers While it's not clear whether this fixes SPARK-9745, fixing this potential race condition seems like a strict improvement. I've added a regression test to illustrate the issue. Author: Andrew Or <and...@databricks.com> Closes #8078 from andrewor14/da-double-kill. (cherry picked from commit be5d1912076c2ffd21ec88611e53d3b3c59b7ecc) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc4ac65d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc4ac65d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc4ac65d Branch: refs/heads/branch-1.5 Commit: bc4ac65d4c0fed93c70582fc74574c5b70aa842d Parents: 0579f28 Author: Andrew Or <and...@databricks.com> Authored: Wed Aug 12 09:24:50 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed Aug 12 09:24:58 2015 -0700 ---------------------------------------------------------------------- .../cluster/CoarseGrainedSchedulerBackend.scala | 11 +++++++---- .../StandaloneDynamicAllocationSuite.scala | 20 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bc4ac65d/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 6acf8a9..5730a87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -422,16 +422,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logWarning(s"Executor to kill $id does not exist!") } + // If an executor is already pending to be removed, do not kill it again (SPARK-9795) + val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) } + executorsPendingToRemove ++= executorsToKill + // If we do not wish to replace the executors we kill, sync the target number of executors // with the cluster manager to avoid allocating new ones. When computing the new target, // take into account executors that are pending to be added or removed. if (!replace) { - doRequestTotalExecutors(numExistingExecutors + numPendingExecutors - - executorsPendingToRemove.size - knownExecutors.size) + doRequestTotalExecutors( + numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) } - executorsPendingToRemove ++= knownExecutors - doKillExecutors(knownExecutors) + doKillExecutors(executorsToKill) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/bc4ac65d/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 08c41a8..1f2a0f0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -283,6 +283,26 @@ class StandaloneDynamicAllocationSuite assert(master.apps.head.getExecutorLimit === 1000) } + test("kill the same executor twice (SPARK-9795)") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + assert(master.apps.size === 1) + assert(master.apps.head.id === appId) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === Int.MaxValue) + // sync executors between the Master and the driver, needed because + // the driver refuses to kill executors it does not know about + syncExecutors(sc) + // kill the same executor twice + val executors = getExecutorIds(sc) + assert(executors.size === 2) + assert(sc.killExecutor(executors.head)) + assert(sc.killExecutor(executors.head)) + assert(master.apps.head.executors.size === 1) + // The limit should not be lowered twice + assert(master.apps.head.getExecutorLimit === 1) + } + // =============================== // | Utility methods for testing | // =============================== --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org