Repository: spark
Updated Branches:
  refs/heads/branch-1.5 166fdf4e3 -> 13920d5fe


[SPARK-10515] When killing executor, the pending replacement executors should 
not be lost

If the heartbeat receiver kills executors (and new ones are not registered to 
replace them), the idle timeout for the old executors will be lost (and then 
change a total number of executors requested by Driver), So new ones will be 
not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout 
before a new executor is asked to replace executor 1. Then driver kill executor 
2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), 
So AM doesn't allocate a executor to replace 1.

see: https://github.com/apache/spark/pull/8668

Author: KaiXinXiaoLei <huleil...@huawei.com>
Author: huleilei <huleil...@huawei.com>

Closes #8945 from KaiXinXiaoLei/pendingexecutor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13920d5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13920d5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13920d5f

Branch: refs/heads/branch-1.5
Commit: 13920d5fecbb8f1aaeb8a5013b1062b91cbabc7b
Parents: 166fdf4
Author: KaiXinXiaoLei <huleil...@huawei.com>
Authored: Thu Oct 15 14:48:01 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Oct 15 14:48:46 2015 -0700

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala |  2 ++
 .../StandaloneDynamicAllocationSuite.scala      | 35 ++++++++++++++++++++
 2 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13920d5f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5730a87..6ae8fed 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -432,6 +432,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     if (!replace) {
       doRequestTotalExecutors(
         numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
+    } else {
+      numPendingExecutors += knownExecutors.size
     }
 
     doKillExecutors(executorsToKill)

http://git-wip-us.apache.org/repos/asf/spark/blob/13920d5f/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 2e2fa22..d145e78 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -369,6 +369,41 @@ class StandaloneDynamicAllocationSuite
     assert(apps.head.getExecutorLimit === 1)
   }
 
+  test("the pending replacement executors should not be lost (SPARK-10515)") {
+    sc = new SparkContext(appConf)
+    val appId = sc.applicationId
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
+    // sync executors between the Master and the driver, needed because
+    // the driver refuses to kill executors it does not know about
+    syncExecutors(sc)
+    val executors = getExecutorIds(sc)
+    assert(executors.size === 2)
+    // kill executor 1, and replace it
+    assert(sc.killAndReplaceExecutor(executors.head))
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.head.executors.size === 2)
+    }
+
+    var apps = getApplications()
+    // kill executor 1
+    assert(sc.killExecutor(executors.head))
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 2)
+    // kill executor 2
+    assert(sc.killExecutor(executors(1)))
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
+  }
+
   // ===============================
   // | Utility methods for testing |
   // ===============================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to