Repository: spark
Updated Branches:
  refs/heads/branch-1.5 0579f28df -> bc4ac65d4


[SPARK-9795] Dynamic allocation: avoid double counting when killing same 
executor twice

This is based on KaiXinXiaoLei's changes in #7716.

The issue is that when someone calls `sc.killExecutor("1")` on the same 
executor twice quickly, then the executor target will be adjusted downwards by 
2 instead of 1 even though we're only actually killing one executor. In certain 
cases where we don't adjust the target back upwards quickly, we'll end up with 
jobs hanging.

This is a common danger because there are many places where this is called:
- `HeartbeatReceiver` kills an executor that has not been sending heartbeats
- `ExecutorAllocationManager` kills an executor that has been idle
- The user code might call this, which may interfere with the previous callers

While it's not clear whether this fixes SPARK-9745, fixing this potential race 
condition seems like a strict improvement. I've added a regression test to 
illustrate the issue.

Author: Andrew Or <and...@databricks.com>

Closes #8078 from andrewor14/da-double-kill.

(cherry picked from commit be5d1912076c2ffd21ec88611e53d3b3c59b7ecc)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc4ac65d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc4ac65d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc4ac65d

Branch: refs/heads/branch-1.5
Commit: bc4ac65d4c0fed93c70582fc74574c5b70aa842d
Parents: 0579f28
Author: Andrew Or <and...@databricks.com>
Authored: Wed Aug 12 09:24:50 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Aug 12 09:24:58 2015 -0700

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala | 11 +++++++----
 .../StandaloneDynamicAllocationSuite.scala      | 20 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc4ac65d/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6acf8a9..5730a87 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -422,16 +422,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       logWarning(s"Executor to kill $id does not exist!")
     }
 
+    // If an executor is already pending to be removed, do not kill it again 
(SPARK-9795)
+    val executorsToKill = knownExecutors.filter { id => 
!executorsPendingToRemove.contains(id) }
+    executorsPendingToRemove ++= executorsToKill
+
     // If we do not wish to replace the executors we kill, sync the target 
number of executors
     // with the cluster manager to avoid allocating new ones. When computing 
the new target,
     // take into account executors that are pending to be added or removed.
     if (!replace) {
-      doRequestTotalExecutors(numExistingExecutors + numPendingExecutors
-        - executorsPendingToRemove.size - knownExecutors.size)
+      doRequestTotalExecutors(
+        numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
     }
 
-    executorsPendingToRemove ++= knownExecutors
-    doKillExecutors(knownExecutors)
+    doKillExecutors(executorsToKill)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bc4ac65d/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 08c41a8..1f2a0f0 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -283,6 +283,26 @@ class StandaloneDynamicAllocationSuite
     assert(master.apps.head.getExecutorLimit === 1000)
   }
 
+  test("kill the same executor twice (SPARK-9795)") {
+    sc = new SparkContext(appConf)
+    val appId = sc.applicationId
+    assert(master.apps.size === 1)
+    assert(master.apps.head.id === appId)
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    // sync executors between the Master and the driver, needed because
+    // the driver refuses to kill executors it does not know about
+    syncExecutors(sc)
+    // kill the same executor twice
+    val executors = getExecutorIds(sc)
+    assert(executors.size === 2)
+    assert(sc.killExecutor(executors.head))
+    assert(sc.killExecutor(executors.head))
+    assert(master.apps.head.executors.size === 1)
+    // The limit should not be lowered twice
+    assert(master.apps.head.getExecutorLimit === 1)
+  }
+
   // ===============================
   // | Utility methods for testing |
   // ===============================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to