This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 21bc047  [SPARK-30511][SPARK-28403][CORE] Don't treat failed/killed 
speculative tasks as pending in ExecutorAllocationManager
21bc047 is described below

commit 21bc0474bbb16c7648aed40f25a2945d98d2a167
Author: zebi...@fb.com <zebi...@fb.com>
AuthorDate: Fri Jan 31 08:49:34 2020 -0600

    [SPARK-30511][SPARK-28403][CORE] Don't treat failed/killed speculative 
tasks as pending in ExecutorAllocationManager
    
    ### What changes were proposed in this pull request?
    
    Currently, when speculative tasks fail/get killed, they are still 
considered as pending and count towards the calculation of number of needed 
executors. To be more accurate: 
`stageAttemptToNumSpeculativeTasks(stageAttempt)` is incremented on 
onSpeculativeTaskSubmitted, but never decremented.  
`stageAttemptToNumSpeculativeTasks -= stageAttempt` is performed on stage 
completion. **This means Spark is marking ended speculative tasks as pending, 
which leads to Spark to hold more executors [...]
    
    This PR fixes this issue by updating `stageAttemptToSpeculativeTaskIndices` 
and  `stageAttemptToNumSpeculativeTasks` on speculative tasks completion.  This 
PR also addresses some other minor issues: scheduler behavior after receiving 
an intentionally killed task event; try to address 
[SPARK-28403](https://issues.apache.org/jira/browse/SPARK-28403).
    
    ### Why are the changes needed?
    
    This has caused resource wastage in our production with speculation 
enabled. With aggressive speculation, we found data skewed jobs can hold 
hundreds of idle executors with less than 10 tasks running.
    
    An easy repro of the issue (`--conf spark.speculation=true --conf 
spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=1000` in 
cluster mode):
    ```
    val n = 4000
    val someRDD = sc.parallelize(1 to n, n)
    someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
    if (index < 300 && index >= 150) {
        Thread.sleep(index * 1000) // Fake running tasks
    } else if (index == 300) {
        Thread.sleep(1000 * 1000) // Fake long running tasks
    }
    it.toList.map(x => index + ", " + x).iterator
    }).collect
    ```
    You will see when running the last task, we would be hold 38 executors (see 
below), which is exactly (152 + 3) / 4 = 38.
    
![image](https://user-images.githubusercontent.com/9404831/72469112-9a7fac00-3793-11ea-8f50-74d0ab7325a4.png)
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a comprehensive unit test.
    
    Test with the above repro shows that we are holding 2 executors at the end
    
![image](https://user-images.githubusercontent.com/9404831/72469177-bbe09800-3793-11ea-850f-4a2c67142899.png)
    
    Closes #27223 from linzebing/speculation_fix.
    
    Authored-by: zebi...@fb.com <zebi...@fb.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../apache/spark/ExecutorAllocationManager.scala   |  61 ++++++----
 .../spark/ExecutorAllocationManagerSuite.scala     | 135 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bff854a..677386c 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -263,9 +263,16 @@ private[spark] class ExecutorAllocationManager(
    */
   private def maxNumExecutorsNeeded(): Int = {
     val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
-    math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
-              tasksPerExecutorForFullParallelism)
-      .toInt
+    val maxNeeded = math.ceil(numRunningOrPendingTasks * 
executorAllocationRatio /
+      tasksPerExecutorForFullParallelism).toInt
+    if (tasksPerExecutorForFullParallelism > 1 && maxNeeded == 1 &&
+      listener.pendingSpeculativeTasks > 0) {
+      // If we have pending speculative tasks and only need a single executor, 
allocate one more
+      // to satisfy the locality requirements of speculation
+      maxNeeded + 1
+    } else {
+      maxNeeded
+    }
   }
 
   private def totalRunningTasks(): Int = synchronized {
@@ -377,14 +384,8 @@ private[spark] class ExecutorAllocationManager(
     // If our target has not changed, do not send a message
     // to the cluster manager and reset our exponential growth
     if (delta == 0) {
-      // Check if there is any speculative jobs pending
-      if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
-        numExecutorsTarget =
-          math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), 
minNumExecutors)
-      } else {
-        numExecutorsToAdd = 1
-        return 0
-      }
+      numExecutorsToAdd = 1
+      return 0
     }
 
     val addRequestAcknowledged = try {
@@ -512,7 +513,7 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-    // Number of speculative tasks to be scheduled in each stageAttempt
+    // Number of speculative tasks pending/running in each stageAttempt
     private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
     // The speculative tasks started in each stageAttempt
     private val stageAttemptToSpeculativeTaskIndices =
@@ -614,18 +615,30 @@ private[spark] class ExecutorAllocationManager(
             stageAttemptToNumRunningTask -= stageAttempt
           }
         }
-        // If the task failed, we expect it to be resubmitted later. To ensure 
we have
-        // enough resources to run the resubmitted task, we need to mark the 
scheduler
-        // as backlogged again if it's not already marked as such (SPARK-8366)
-        if (taskEnd.reason != Success) {
-          if (totalPendingTasks() == 0) {
-            allocationManager.onSchedulerBacklogged()
-          }
-          if (taskEnd.taskInfo.speculative) {
-            stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove(taskIndex)}
-          } else {
-            stageAttemptToTaskIndices.get(stageAttempt).foreach 
{_.remove(taskIndex)}
-          }
+
+        if (taskEnd.taskInfo.speculative) {
+          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove{taskIndex}}
+          stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
+        }
+
+        taskEnd.reason match {
+          case Success | _: TaskKilled =>
+          case _ =>
+            if (totalPendingTasks() == 0) {
+              // If the task failed (not intentionally killed), we expect it 
to be resubmitted
+              // later. To ensure we have enough resources to run the 
resubmitted task, we need to
+              // mark the scheduler as backlogged again if it's not already 
marked as such
+              // (SPARK-8366)
+              allocationManager.onSchedulerBacklogged()
+            }
+            if (!taskEnd.taskInfo.speculative) {
+              // If a non-speculative task is intentionally killed, it means 
the speculative task
+              // has succeeded, and no further task of this task index will be 
resubmitted. In this
+              // case, the task index is completed and we shouldn't remove it 
from
+              // stageAttemptToTaskIndices. Otherwise, we will have a pending 
non-speculative task
+              // for the task index (SPARK-30511)
+              stageAttemptToTaskIndices.get(stageAttempt).foreach 
{_.remove(taskIndex)}
+            }
         }
       }
     }
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 99f3e3b..8d95849 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -264,6 +264,141 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
     assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("SPARK-30511 remove executors when speculative tasks end") {
+    val clock = new ManualClock()
+    val stage = createStageInfo(0, 40)
+    val manager = createManager(createConf(0, 10, 
0).set(config.EXECUTOR_CORES, 4), clock = clock)
+
+    post(SparkListenerStageSubmitted(stage))
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 2)
+    assert(addExecutors(manager) === 4)
+    assert(addExecutors(manager) === 3)
+
+    (0 to 9).foreach(execId => onExecutorAdded(manager, execId.toString))
+    (0 to 39).map { i => createTaskInfo(i, i, executorId = s"${i / 
4}")}.foreach {
+      info => post(SparkListenerTaskStart(0, 0, info))
+    }
+    assert(numExecutorsTarget(manager) === 10)
+    assert(maxNumExecutorsNeeded(manager) == 10)
+
+    // 30 tasks (0 - 29) finished
+    (0 to 29).map { i => createTaskInfo(i, i, executorId = s"${i / 
4}")}.foreach {
+      info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new 
ExecutorMetrics, null)) }
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 3)
+    assert(maxNumExecutorsNeeded(manager) == 3)
+    (0 to 6).foreach { i => assert(removeExecutor(manager, i.toString))}
+    (0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)}
+
+    // 10 speculative tasks (30 - 39) launch for the remaining tasks
+    (30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))}
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsTarget(manager) == 5)
+    assert(maxNumExecutorsNeeded(manager) == 5)
+    (10 to 12).foreach(execId => onExecutorAdded(manager, execId.toString))
+    (40 to 49).map { i =>
+      createTaskInfo(taskId = i, taskIndex = i - 10, executorId = s"${i / 4}", 
speculative = true)}
+      .foreach { info => post(SparkListenerTaskStart(0, 0, info))}
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) == 5) // At this point, we still have 6 
executors running
+    assert(maxNumExecutorsNeeded(manager) == 5)
+
+    // 6 speculative tasks (40 - 45) finish before the original tasks, with 4 
speculative remaining
+    (40 to 45).map { i =>
+      createTaskInfo(taskId = i, taskIndex = i - 10, executorId = s"${i / 4}", 
speculative = true)}
+      .foreach {
+        info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new 
ExecutorMetrics, null))}
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 4)
+    assert(maxNumExecutorsNeeded(manager) == 4)
+    assert(removeExecutor(manager, "10"))
+    onExecutorRemoved(manager, "10")
+    // At this point, we still have 5 executors running: ["7", "8", "9", "11", 
"12"]
+
+    // 6 original tasks (30 - 35) are intentionally killed
+    (30 to 35).map { i =>
+      createTaskInfo(i, i, executorId = s"${i / 4}")}
+      .foreach { info => post(
+        SparkListenerTaskEnd(0, 0, null, TaskKilled("test"), info, new 
ExecutorMetrics, null))}
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 2)
+    assert(maxNumExecutorsNeeded(manager) == 2)
+    (7 to 8).foreach { i => assert(removeExecutor(manager, i.toString))}
+    (7 to 8).foreach { i => onExecutorRemoved(manager, i.toString)}
+    // At this point, we still have 3 executors running: ["9", "11", "12"]
+
+    // Task 36 finishes before the speculative task 46, task 46 killed
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(36, 36, executorId = "9"), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, TaskKilled("test"),
+      createTaskInfo(46, 36, executorId = "11", speculative = true), new 
ExecutorMetrics, null))
+
+    // We should have 3 original tasks (index 37, 38, 39) running, with 
corresponding 3 speculative
+    // tasks running. Target lowers to 2, but still hold 3 executors ["9", 
"11", "12"]
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 2)
+    assert(maxNumExecutorsNeeded(manager) == 2)
+    // At this point, we still have 3 executors running: ["9", "11", "12"]
+
+    // Task 37 and 47 succeed at the same time
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(37, 37, executorId = "9"), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(47, 37, executorId = "11", speculative = true), new 
ExecutorMetrics, null))
+
+    // We should have 2 original tasks (index 38, 39) running, with 
corresponding 2 speculative
+    // tasks running
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 1)
+    assert(maxNumExecutorsNeeded(manager) == 1)
+    assert(removeExecutor(manager, "11"))
+    onExecutorRemoved(manager, "11")
+    // At this point, we still have 2 executors running: ["9", "12"]
+
+    // Task 38 fails and task 49 fails, new speculative task 50 is submitted 
to speculate on task 39
+    post(SparkListenerTaskEnd(0, 0, null, UnknownReason,
+      createTaskInfo(38, 38, executorId = "9"), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, UnknownReason,
+      createTaskInfo(49, 39, executorId = "12", speculative = true), new 
ExecutorMetrics, null))
+    post(SparkListenerSpeculativeTaskSubmitted(0))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    // maxNeeded = 1, allocate one more to satisfy speculation locality 
requirement
+    assert(numExecutorsTarget(manager) === 2)
+    assert(maxNumExecutorsNeeded(manager) == 2)
+    post(SparkListenerTaskStart(0, 0,
+      createTaskInfo(50, 39, executorId = "12", speculative = true)))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 1)
+    assert(maxNumExecutorsNeeded(manager) == 1)
+
+    // Task 39 and 48 succeed, task 50 killed
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(39, 39, executorId = "9"), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(48, 38, executorId = "12", speculative = true), new 
ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, TaskKilled("test"),
+      createTaskInfo(50, 39, executorId = "12", speculative = true), new 
ExecutorMetrics, null))
+    post(SparkListenerStageCompleted(stage))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 0)
+    assert(maxNumExecutorsNeeded(manager) == 0)
+    assert(removeExecutor(manager, "9"))
+    onExecutorRemoved(manager, "9")
+    assert(removeExecutor(manager, "12"))
+    onExecutorRemoved(manager, "12")
+  }
+
   test("properly handle task end events from completed stages") {
     val manager = createManager(createConf(0, 10, 0))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to