Repository: spark
Updated Branches:
  refs/heads/branch-2.2 f1accc851 -> f5ede0d55


[SPARK-21656][CORE] spark dynamic allocation should not idle timeout executors 
when tasks still to run

## What changes were proposed in this pull request?

Right now spark lets go of executors when they are idle for the 60s (or 
configurable time). I have seen spark let them go when they are idle but they 
were really needed. I have seen this issue when the scheduler was waiting to 
get node locality but that takes longer than the default idle timeout. In these 
jobs the number of executors goes down really small (less than 10) but there 
are still like 80,000 tasks to run.
We should consider not allowing executors to idle timeout if they are still 
needed according to the number of tasks to be run.

## How was this patch tested?

Tested by manually adding executors to `executorsIdsToBeRemoved` list and 
seeing if those executors were removed when there are a lot of tasks and a high 
`numExecutorsTarget` value.

Code used

In  `ExecutorAllocationManager.start()`

```
    start_time = clock.getTimeMillis()
```

In `ExecutorAllocationManager.schedule()`
```
    val executorIdsToBeRemoved = ArrayBuffer[String]()
    if ( now > start_time + 1000 * 60 * 2) {
      logInfo("--- REMOVING 1/2 of the EXECUTORS ---")
      start_time +=  1000 * 60 * 100
      var counter = 0
      for (x <- executorIds) {
        counter += 1
        if (counter == 2) {
          counter = 0
          executorIdsToBeRemoved += x
        }
      }
    }

Author: John Lee <jl...@yahoo-inc.com>

Closes #18874 from yoonlee95/SPARK-21656.

(cherry picked from commit adf005dabe3b0060033e1eeaedbab31a868efc8c)
Signed-off-by: Tom Graves <tgra...@yahoo-inc.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5ede0d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5ede0d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5ede0d5

Branch: refs/heads/branch-2.2
Commit: f5ede0d558e3db51867d8c1c0a12c8fb286c797c
Parents: f1accc8
Author: John Lee <jl...@yahoo-inc.com>
Authored: Wed Aug 16 09:44:09 2017 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Wed Aug 16 09:44:22 2017 -0500

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |   5 +-
 .../spark/ExecutorAllocationManagerSuite.scala  | 119 +++++++++++++------
 2 files changed, 89 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5ede0d5/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index fcc72ff..bb5eb7f 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -410,7 +410,10 @@ private[spark] class ExecutorAllocationManager(
     executors.foreach { executorIdToBeRemoved =>
       if (newExecutorTotal - 1 < minNumExecutors) {
         logDebug(s"Not removing idle executor $executorIdToBeRemoved because 
there are only " +
-          s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
+          s"$newExecutorTotal executor(s) left (minimum number of executor 
limit $minNumExecutors)")
+      } else if (newExecutorTotal - 1 < numExecutorsTarget) {
+        logDebug(s"Not removing idle executor $executorIdToBeRemoved because 
there are only " +
+          s"$newExecutorTotal executor(s) left (number of executor target 
$numExecutorsTarget)")
       } else if (canBeKilled(executorIdToBeRemoved)) {
         executorIdsToBeRemoved += executorIdToBeRemoved
         newExecutorTotal -= 1

http://git-wip-us.apache.org/repos/asf/spark/blob/f5ede0d5/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 4ea42fc..b9ce71a 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -314,8 +314,47 @@ class ExecutorAllocationManagerSuite
     assert(executorsPendingToRemove(manager).isEmpty)
   }
 
+  test ("Removing with various numExecutorsTarget condition") {
+    sc = createSparkContext(5, 12, 5)
+    val manager = sc.executorAllocationManager.get
+
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
8)))
+
+    // Remove when numExecutorsTarget is the same as the current number of 
executors
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 2)
+    (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
+      info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
+    assert(executorIds(manager).size === 8)
+    assert(numExecutorsTarget(manager) === 8)
+    assert(maxNumExecutorsNeeded(manager) == 8)
+    assert(!removeExecutor(manager, "1")) // won't work since 
numExecutorsTarget == numExecutors
+
+    // Remove executors when numExecutorsTarget is lower than current number 
of executors
+    (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach {
+      info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, 
Success, info, null)) }
+    adjustRequestedExecutors(manager)
+    assert(executorIds(manager).size === 8)
+    assert(numExecutorsTarget(manager) === 5)
+    assert(maxNumExecutorsNeeded(manager) == 5)
+    assert(removeExecutor(manager, "1"))
+    assert(removeExecutors(manager, Seq("2", "3"))=== Seq("2", "3"))
+    onExecutorRemoved(manager, "1")
+    onExecutorRemoved(manager, "2")
+    onExecutorRemoved(manager, "3")
+
+    // numExecutorsTarget is lower than minNumExecutors
+    sc.listenerBus.postToAll(
+      SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), 
null))
+    assert(executorIds(manager).size === 5)
+    assert(numExecutorsTarget(manager) === 5)
+    assert(maxNumExecutorsNeeded(manager) == 4)
+    assert(!removeExecutor(manager, "4")) // lower limit
+    assert(addExecutors(manager) === 0) // upper limit
+  }
+
   test ("interleaving add and remove") {
-    sc = createSparkContext(5, 10, 5)
+    sc = createSparkContext(5, 12, 5)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
1000)))
 
@@ -331,52 +370,59 @@ class ExecutorAllocationManagerSuite
     onExecutorAdded(manager, "7")
     onExecutorAdded(manager, "8")
     assert(executorIds(manager).size === 8)
+    assert(numExecutorsTarget(manager) === 8)
 
-    // Remove until limit
-    assert(removeExecutor(manager, "1"))
-    assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
-    assert(!removeExecutor(manager, "4")) // lower limit reached
-    assert(!removeExecutor(manager, "5"))
-    onExecutorRemoved(manager, "1")
-    onExecutorRemoved(manager, "2")
-    onExecutorRemoved(manager, "3")
-    assert(executorIds(manager).size === 5)
 
-    // Add until limit
-    assert(addExecutors(manager) === 2) // upper limit reached
-    assert(addExecutors(manager) === 0)
-    assert(!removeExecutor(manager, "4")) // still at lower limit
-    assert((manager, Seq("5")) !== Seq("5"))
+    // Remove when numTargetExecutors is equal to the current number of 
executors
+    assert(!removeExecutor(manager, "1"))
+    assert(removeExecutors(manager, Seq("2", "3")) !== Seq("2", "3"))
+
+    // Remove until limit
     onExecutorAdded(manager, "9")
     onExecutorAdded(manager, "10")
     onExecutorAdded(manager, "11")
     onExecutorAdded(manager, "12")
-    onExecutorAdded(manager, "13")
-    assert(executorIds(manager).size === 10)
+    assert(executorIds(manager).size === 12)
+    assert(numExecutorsTarget(manager) === 8)
 
-    // Remove succeeds again, now that we are no longer at the lower limit
-    assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6"))
-    assert(removeExecutor(manager, "7"))
-    assert(executorIds(manager).size === 10)
-    assert(addExecutors(manager) === 0)
+    assert(removeExecutor(manager, "1"))
+    assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", "4"))
+    assert(!removeExecutor(manager, "5")) // lower limit reached
+    assert(!removeExecutor(manager, "6"))
+    onExecutorRemoved(manager, "1")
+    onExecutorRemoved(manager, "2")
+    onExecutorRemoved(manager, "3")
     onExecutorRemoved(manager, "4")
-    onExecutorRemoved(manager, "5")
     assert(executorIds(manager).size === 8)
 
-    // Number of executors pending restarts at 1
-    assert(numExecutorsToAdd(manager) === 1)
-    assert(addExecutors(manager) === 0)
-    assert(executorIds(manager).size === 8)
-    onExecutorRemoved(manager, "6")
-    onExecutorRemoved(manager, "7")
+    // Add until limit
+    assert(!removeExecutor(manager, "7")) // still at lower limit
+    assert((manager, Seq("8")) !== Seq("8"))
+    onExecutorAdded(manager, "13")
     onExecutorAdded(manager, "14")
     onExecutorAdded(manager, "15")
-    assert(executorIds(manager).size === 8)
-    assert(addExecutors(manager) === 0) // still at upper limit
     onExecutorAdded(manager, "16")
+    assert(executorIds(manager).size === 12)
+
+    // Remove succeeds again, now that we are no longer at the lower limit
+    assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7"))
+    assert(removeExecutor(manager, "8"))
+    assert(executorIds(manager).size === 12)
+    onExecutorRemoved(manager, "5")
+    onExecutorRemoved(manager, "6")
+    assert(executorIds(manager).size === 10)
+    assert(numExecutorsToAdd(manager) === 4)
+    onExecutorRemoved(manager, "9")
+    onExecutorRemoved(manager, "10")
+    assert(addExecutors(manager) === 4) // at upper limit
     onExecutorAdded(manager, "17")
+    onExecutorAdded(manager, "18")
     assert(executorIds(manager).size === 10)
-    assert(numExecutorsTarget(manager) === 10)
+    assert(addExecutors(manager) === 0) // still at upper limit
+    onExecutorAdded(manager, "19")
+    onExecutorAdded(manager, "20")
+    assert(executorIds(manager).size === 12)
+    assert(numExecutorsTarget(manager) === 12)
   }
 
   test("starting/canceling add timer") {
@@ -915,12 +961,17 @@ class ExecutorAllocationManagerSuite
     onExecutorAdded(manager, "third")
     onExecutorAdded(manager, "fourth")
     onExecutorAdded(manager, "fifth")
-    assert(executorIds(manager) === Set("first", "second", "third", "fourth", 
"fifth"))
+    onExecutorAdded(manager, "sixth")
+    onExecutorAdded(manager, "seventh")
+    onExecutorAdded(manager, "eighth")
+    assert(executorIds(manager) === Set("first", "second", "third", "fourth", 
"fifth",
+      "sixth", "seventh", "eighth"))
 
     removeExecutor(manager, "first")
     removeExecutors(manager, Seq("second", "third"))
     assert(executorsPendingToRemove(manager) === Set("first", "second", 
"third"))
-    assert(executorIds(manager) === Set("first", "second", "third", "fourth", 
"fifth"))
+    assert(executorIds(manager) === Set("first", "second", "third", "fourth", 
"fifth",
+      "sixth", "seventh", "eighth"))
 
 
     // Cluster manager lost will make all the live executors lost, so here 
simulate this behavior


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to