Repository: spark
Updated Branches:
  refs/heads/branch-2.3 b072717b3 -> dbf0b9340


[SPARK-24909][CORE] Always unregister pending partition on task completion.

Spark scheduler can hang when fetch failures, executor lost, task running on 
lost executor, and multiple stage attempts. To fix this we change to always 
unregister the pending partition on task completion.

this PR is actually reverting the change in SPARK-19263, so that it always does 
shuffleStage.pendingPartitions -= task.partitionId.   The change in 
SPARK-23433, should fix the issue originally from SPARK-19263.

Unit tests.  The condition happens on a race which I haven't reproduced on a 
real customer, just see it sometimes on customers jobs in a real cluster.
I am also working on adding spark scheduler integration tests.

Closes #21976 from tgravescs/SPARK-24909.

Authored-by: Thomas Graves <tgra...@unharmedunarmed.corp.ne1.yahoo.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
(cherry picked from commit ec3e9986385880adce1648eae30007eccff862ba)
Signed-off-by: Thomas Graves <tgra...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbf0b934
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbf0b934
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbf0b934

Branch: refs/heads/branch-2.3
Commit: dbf0b934024dfa562775967e3e3114de5a163443
Parents: b072717
Author: Thomas Graves <tgra...@unharmedunarmed.corp.ne1.yahoo.com>
Authored: Wed Aug 29 16:32:02 2018 -0700
Committer: Thomas Graves <tgra...@apache.org>
Committed: Thu Aug 30 09:10:00 2018 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 17 +-------------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 24 ++++++++++++--------
 2 files changed, 16 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dbf0b934/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7029e22..0df38f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1252,18 +1252,10 @@ class DAGScheduler(
 
           case smt: ShuffleMapTask =>
             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
+            shuffleStage.pendingPartitions -= task.partitionId
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
-            if (stageIdToStage(task.stageId).latestInfo.attemptNumber == 
task.stageAttemptId) {
-              // This task was for the currently running attempt of the stage. 
Since the task
-              // completed successfully from the perspective of the 
TaskSetManager, mark it as
-              // no longer pending (the TaskSetManager may consider the task 
complete even
-              // when the output needs to be ignored because the task's epoch 
is too small below.
-              // In this case, when pending partitions is empty, there will 
still be missing
-              // output locations, which will cause the DAGScheduler to 
resubmit the stage below.)
-              shuffleStage.pendingPartitions -= task.partitionId
-            }
             if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
               logInfo(s"Ignoring possibly bogus $smt completion from executor 
$execId")
             } else {
@@ -1272,13 +1264,6 @@ class DAGScheduler(
               // available.
               mapOutputTracker.registerMapOutput(
                 shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
-              // Remove the task's partition from pending partitions. This may 
have already been
-              // done above, but will not have been done yet in cases where 
the task attempt was
-              // from an earlier attempt of the stage (i.e., not the attempt 
that's currently
-              // running).  This allows the DAGScheduler to mark the stage as 
complete when one
-              // copy of each task has finished successfully, even if the 
currently active stage
-              // still has tasks running.
-              shuffleStage.pendingPartitions -= task.partitionId
             }
 
             if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/dbf0b934/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8b6ec37..c07b4a9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2395,7 +2395,11 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     runEvent(makeCompletionEvent(
       taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
 
-    // Both tasks in rddB should be resubmitted, because none of them has 
succeeded truely.
+    // task(stageId=1, stageAttemptId=1, partitionId=1) should be marked 
completed when
+    // task(stageId=1, stageAttemptId=0, partitionId=1) finished
+    // ideally we would verify that but no way to get into task scheduler to 
verify
+
+    // Both tasks in rddB should be resubmitted, because none of them has 
succeeded truly.
     // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) 
successfully.
     // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active 
stage attempt
     // is still running.
@@ -2404,19 +2408,21 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     runEvent(makeCompletionEvent(
       taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
 
-    // There should be no new attempt of stage submitted,
-    // because task(stageId=1, stageAttempt=1, partitionId=1) is still running 
in
-    // the current attempt (and hasn't completed successfully in any earlier 
attempts).
-    assert(taskSets.size === 4)
+    // At this point there should be no active task set for stageId=1 and we 
need
+    // to resubmit because the output from (stageId=1, stageAttemptId=0, 
partitionId=1)
+    // was ignored due to executor failure
+    assert(taskSets.size === 5)
+    assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2
+      && taskSets(4).tasks.size === 1)
 
-    // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
+    // Complete task(stageId=1, stageAttempt=2, partitionId=1) successfully.
     runEvent(makeCompletionEvent(
-      taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
+      taskSets(4).tasks(0), Success, makeMapStatus("hostB", 2)))
 
     // Now the ResultStage should be submitted, because all of the tasks of 
rddB have
     // completed successfully on alive executors.
-    assert(taskSets.size === 5 && 
taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
-    complete(taskSets(4), Seq(
+    assert(taskSets.size === 6 && 
taskSets(5).tasks(0).isInstanceOf[ResultTask[_, _]])
+    complete(taskSets(5), Seq(
       (Success, 1),
       (Success, 1)))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to