Repository: spark
Updated Branches:
  refs/heads/master b1581ac28 -> b85f9a242


[SPARK-8366] maxNumExecutorsNeeded should properly handle failed tasks

Author: xutingjun <xuting...@huawei.com>
Author: meiyoula <1039320...@qq.com>

Closes #6817 from XuTingjun/SPARK-8366.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b85f9a24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b85f9a24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b85f9a24

Branch: refs/heads/master
Commit: b85f9a242a12e8096e331fa77d5ebd16e93c844d
Parents: b1581ac
Author: xutingjun <xuting...@huawei.com>
Authored: Tue Aug 11 23:19:35 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Aug 11 23:19:35 2015 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 22 +++++++++++++-------
 .../spark/ExecutorAllocationManagerSuite.scala  | 22 ++++++++++++++++++--
 2 files changed, 34 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b85f9a24/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 1877aaf..b93536e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -599,14 +599,8 @@ private[spark] class ExecutorAllocationManager(
 
         // If this is the last pending task, mark the scheduler queue as empty
         stageIdToTaskIndices.getOrElseUpdate(stageId, new 
mutable.HashSet[Int]) += taskIndex
-        val numTasksScheduled = stageIdToTaskIndices(stageId).size
-        val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
-        if (numTasksScheduled == numTasksTotal) {
-          // No more pending tasks for this stage
-          stageIdToNumTasks -= stageId
-          if (stageIdToNumTasks.isEmpty) {
-            allocationManager.onSchedulerQueueEmpty()
-          }
+        if (totalPendingTasks() == 0) {
+          allocationManager.onSchedulerQueueEmpty()
         }
 
         // Mark the executor on which this task is scheduled as busy
@@ -618,6 +612,8 @@ private[spark] class ExecutorAllocationManager(
     override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
       val executorId = taskEnd.taskInfo.executorId
       val taskId = taskEnd.taskInfo.taskId
+      val taskIndex = taskEnd.taskInfo.index
+      val stageId = taskEnd.stageId
       allocationManager.synchronized {
         numRunningTasks -= 1
         // If the executor is no longer running any scheduled tasks, mark it 
as idle
@@ -628,6 +624,16 @@ private[spark] class ExecutorAllocationManager(
             allocationManager.onExecutorIdle(executorId)
           }
         }
+
+        // If the task failed, we expect it to be resubmitted later. To ensure 
we have
+        // enough resources to run the resubmitted task, we need to mark the 
scheduler
+        // as backlogged again if it's not already marked as such (SPARK-8366)
+        if (taskEnd.reason != Success) {
+          if (totalPendingTasks() == 0) {
+            allocationManager.onSchedulerBacklogged()
+          }
+          stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b85f9a24/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 34caca8..f374f97 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -206,8 +206,8 @@ class ExecutorAllocationManagerSuite
 
     val task2Info = createTaskInfo(1, 0, "executor-1")
     sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
-    sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, 
null))
-    sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, 
null))
+    sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, 
task1Info, null))
+    sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, 
task2Info, null))
 
     assert(adjustRequestedExecutors(manager) === -1)
   }
@@ -787,6 +787,24 @@ class ExecutorAllocationManagerSuite
       Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
   }
 
+  test("SPARK-8366: maxNumExecutorsNeeded should properly handle failed 
tasks") {
+    sc = createSparkContext()
+    val manager = sc.executorAllocationManager.get
+    assert(maxNumExecutorsNeeded(manager) === 0)
+
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
1)))
+    assert(maxNumExecutorsNeeded(manager) === 1)
+
+    val taskInfo = createTaskInfo(1, 1, "executor-1")
+    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
+    assert(maxNumExecutorsNeeded(manager) === 1)
+
+    // If the task is failed, we expect it to be resubmitted later.
+    val taskEndReason = ExceptionFailure(null, null, null, null, null)
+    sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, 
taskInfo, null))
+    assert(maxNumExecutorsNeeded(manager) === 1)
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to