Repository: spark Updated Branches: refs/heads/branch-2.2 f1accc851 -> f5ede0d55
[SPARK-21656][CORE] spark dynamic allocation should not idle timeout executors when tasks still to run ## What changes were proposed in this pull request? Right now spark lets go of executors when they are idle for the 60s (or configurable time). I have seen spark let them go when they are idle but they were really needed. I have seen this issue when the scheduler was waiting to get node locality but that takes longer than the default idle timeout. In these jobs the number of executors goes down really small (less than 10) but there are still like 80,000 tasks to run. We should consider not allowing executors to idle timeout if they are still needed according to the number of tasks to be run. ## How was this patch tested? Tested by manually adding executors to `executorsIdsToBeRemoved` list and seeing if those executors were removed when there are a lot of tasks and a high `numExecutorsTarget` value. Code used In `ExecutorAllocationManager.start()` ``` start_time = clock.getTimeMillis() ``` In `ExecutorAllocationManager.schedule()` ``` val executorIdsToBeRemoved = ArrayBuffer[String]() if ( now > start_time + 1000 * 60 * 2) { logInfo("--- REMOVING 1/2 of the EXECUTORS ---") start_time += 1000 * 60 * 100 var counter = 0 for (x <- executorIds) { counter += 1 if (counter == 2) { counter = 0 executorIdsToBeRemoved += x } } } Author: John Lee <jl...@yahoo-inc.com> Closes #18874 from yoonlee95/SPARK-21656. (cherry picked from commit adf005dabe3b0060033e1eeaedbab31a868efc8c) Signed-off-by: Tom Graves <tgra...@yahoo-inc.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5ede0d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5ede0d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5ede0d5 Branch: refs/heads/branch-2.2 Commit: f5ede0d558e3db51867d8c1c0a12c8fb286c797c Parents: f1accc8 Author: John Lee <jl...@yahoo-inc.com> Authored: Wed Aug 16 09:44:09 2017 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Wed Aug 16 09:44:22 2017 -0500 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 5 +- .../spark/ExecutorAllocationManagerSuite.scala | 119 +++++++++++++------ 2 files changed, 89 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f5ede0d5/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index fcc72ff..bb5eb7f 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -410,7 +410,10 @@ private[spark] class ExecutorAllocationManager( executors.foreach { executorIdToBeRemoved => if (newExecutorTotal - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + - s"$newExecutorTotal executor(s) left (limit $minNumExecutors)") + s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)") + } else if (newExecutorTotal - 1 < numExecutorsTarget) { + logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + + s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)") } else if (canBeKilled(executorIdToBeRemoved)) { executorIdsToBeRemoved += executorIdToBeRemoved newExecutorTotal -= 1 http://git-wip-us.apache.org/repos/asf/spark/blob/f5ede0d5/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 4ea42fc..b9ce71a 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -314,8 +314,47 @@ class ExecutorAllocationManagerSuite assert(executorsPendingToRemove(manager).isEmpty) } + test ("Removing with various numExecutorsTarget condition") { + sc = createSparkContext(5, 12, 5) + val manager = sc.executorAllocationManager.get + + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8))) + + // Remove when numExecutorsTarget is the same as the current number of executors + assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 2) + (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach { + info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + assert(executorIds(manager).size === 8) + assert(numExecutorsTarget(manager) === 8) + assert(maxNumExecutorsNeeded(manager) == 8) + assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors + + // Remove executors when numExecutorsTarget is lower than current number of executors + (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { + info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) } + adjustRequestedExecutors(manager) + assert(executorIds(manager).size === 8) + assert(numExecutorsTarget(manager) === 5) + assert(maxNumExecutorsNeeded(manager) == 5) + assert(removeExecutor(manager, "1")) + assert(removeExecutors(manager, Seq("2", "3"))=== Seq("2", "3")) + onExecutorRemoved(manager, "1") + onExecutorRemoved(manager, "2") + onExecutorRemoved(manager, "3") + + // numExecutorsTarget is lower than minNumExecutors + sc.listenerBus.postToAll( + SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null)) + assert(executorIds(manager).size === 5) + assert(numExecutorsTarget(manager) === 5) + assert(maxNumExecutorsNeeded(manager) == 4) + assert(!removeExecutor(manager, "4")) // lower limit + assert(addExecutors(manager) === 0) // upper limit + } + test ("interleaving add and remove") { - sc = createSparkContext(5, 10, 5) + sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) @@ -331,52 +370,59 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, "7") onExecutorAdded(manager, "8") assert(executorIds(manager).size === 8) + assert(numExecutorsTarget(manager) === 8) - // Remove until limit - assert(removeExecutor(manager, "1")) - assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3")) - assert(!removeExecutor(manager, "4")) // lower limit reached - assert(!removeExecutor(manager, "5")) - onExecutorRemoved(manager, "1") - onExecutorRemoved(manager, "2") - onExecutorRemoved(manager, "3") - assert(executorIds(manager).size === 5) - // Add until limit - assert(addExecutors(manager) === 2) // upper limit reached - assert(addExecutors(manager) === 0) - assert(!removeExecutor(manager, "4")) // still at lower limit - assert((manager, Seq("5")) !== Seq("5")) + // Remove when numTargetExecutors is equal to the current number of executors + assert(!removeExecutor(manager, "1")) + assert(removeExecutors(manager, Seq("2", "3")) !== Seq("2", "3")) + + // Remove until limit onExecutorAdded(manager, "9") onExecutorAdded(manager, "10") onExecutorAdded(manager, "11") onExecutorAdded(manager, "12") - onExecutorAdded(manager, "13") - assert(executorIds(manager).size === 10) + assert(executorIds(manager).size === 12) + assert(numExecutorsTarget(manager) === 8) - // Remove succeeds again, now that we are no longer at the lower limit - assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6")) - assert(removeExecutor(manager, "7")) - assert(executorIds(manager).size === 10) - assert(addExecutors(manager) === 0) + assert(removeExecutor(manager, "1")) + assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", "4")) + assert(!removeExecutor(manager, "5")) // lower limit reached + assert(!removeExecutor(manager, "6")) + onExecutorRemoved(manager, "1") + onExecutorRemoved(manager, "2") + onExecutorRemoved(manager, "3") onExecutorRemoved(manager, "4") - onExecutorRemoved(manager, "5") assert(executorIds(manager).size === 8) - // Number of executors pending restarts at 1 - assert(numExecutorsToAdd(manager) === 1) - assert(addExecutors(manager) === 0) - assert(executorIds(manager).size === 8) - onExecutorRemoved(manager, "6") - onExecutorRemoved(manager, "7") + // Add until limit + assert(!removeExecutor(manager, "7")) // still at lower limit + assert((manager, Seq("8")) !== Seq("8")) + onExecutorAdded(manager, "13") onExecutorAdded(manager, "14") onExecutorAdded(manager, "15") - assert(executorIds(manager).size === 8) - assert(addExecutors(manager) === 0) // still at upper limit onExecutorAdded(manager, "16") + assert(executorIds(manager).size === 12) + + // Remove succeeds again, now that we are no longer at the lower limit + assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7")) + assert(removeExecutor(manager, "8")) + assert(executorIds(manager).size === 12) + onExecutorRemoved(manager, "5") + onExecutorRemoved(manager, "6") + assert(executorIds(manager).size === 10) + assert(numExecutorsToAdd(manager) === 4) + onExecutorRemoved(manager, "9") + onExecutorRemoved(manager, "10") + assert(addExecutors(manager) === 4) // at upper limit onExecutorAdded(manager, "17") + onExecutorAdded(manager, "18") assert(executorIds(manager).size === 10) - assert(numExecutorsTarget(manager) === 10) + assert(addExecutors(manager) === 0) // still at upper limit + onExecutorAdded(manager, "19") + onExecutorAdded(manager, "20") + assert(executorIds(manager).size === 12) + assert(numExecutorsTarget(manager) === 12) } test("starting/canceling add timer") { @@ -915,12 +961,17 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, "third") onExecutorAdded(manager, "fourth") onExecutorAdded(manager, "fifth") - assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth")) + onExecutorAdded(manager, "sixth") + onExecutorAdded(manager, "seventh") + onExecutorAdded(manager, "eighth") + assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth", + "sixth", "seventh", "eighth")) removeExecutor(manager, "first") removeExecutors(manager, Seq("second", "third")) assert(executorsPendingToRemove(manager) === Set("first", "second", "third")) - assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth")) + assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth", + "sixth", "seventh", "eighth")) // Cluster manager lost will make all the live executors lost, so here simulate this behavior --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org