Repository: spark Updated Branches: refs/heads/branch-2.3 b072717b3 -> dbf0b9340
[SPARK-24909][CORE] Always unregister pending partition on task completion. Spark scheduler can hang when fetch failures, executor lost, task running on lost executor, and multiple stage attempts. To fix this we change to always unregister the pending partition on task completion. this PR is actually reverting the change in SPARK-19263, so that it always does shuffleStage.pendingPartitions -= task.partitionId. The change in SPARK-23433, should fix the issue originally from SPARK-19263. Unit tests. The condition happens on a race which I haven't reproduced on a real customer, just see it sometimes on customers jobs in a real cluster. I am also working on adding spark scheduler integration tests. Closes #21976 from tgravescs/SPARK-24909. Authored-by: Thomas Graves <tgra...@unharmedunarmed.corp.ne1.yahoo.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> (cherry picked from commit ec3e9986385880adce1648eae30007eccff862ba) Signed-off-by: Thomas Graves <tgra...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbf0b934 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbf0b934 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbf0b934 Branch: refs/heads/branch-2.3 Commit: dbf0b934024dfa562775967e3e3114de5a163443 Parents: b072717 Author: Thomas Graves <tgra...@unharmedunarmed.corp.ne1.yahoo.com> Authored: Wed Aug 29 16:32:02 2018 -0700 Committer: Thomas Graves <tgra...@apache.org> Committed: Thu Aug 30 09:10:00 2018 -0500 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 17 +------------- .../spark/scheduler/DAGSchedulerSuite.scala | 24 ++++++++++++-------- 2 files changed, 16 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dbf0b934/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7029e22..0df38f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1252,18 +1252,10 @@ class DAGScheduler( case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] + shuffleStage.pendingPartitions -= task.partitionId val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) { - // This task was for the currently running attempt of the stage. Since the task - // completed successfully from the perspective of the TaskSetManager, mark it as - // no longer pending (the TaskSetManager may consider the task complete even - // when the output needs to be ignored because the task's epoch is too small below. - // In this case, when pending partitions is empty, there will still be missing - // output locations, which will cause the DAGScheduler to resubmit the stage below.) - shuffleStage.pendingPartitions -= task.partitionId - } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { @@ -1272,13 +1264,6 @@ class DAGScheduler( // available. mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) - // Remove the task's partition from pending partitions. This may have already been - // done above, but will not have been done yet in cases where the task attempt was - // from an earlier attempt of the stage (i.e., not the attempt that's currently - // running). This allows the DAGScheduler to mark the stage as complete when one - // copy of each task has finished successfully, even if the currently active stage - // still has tasks running. - shuffleStage.pendingPartitions -= task.partitionId } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { http://git-wip-us.apache.org/repos/asf/spark/blob/dbf0b934/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8b6ec37..c07b4a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2395,7 +2395,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) - // Both tasks in rddB should be resubmitted, because none of them has succeeded truely. + // task(stageId=1, stageAttemptId=1, partitionId=1) should be marked completed when + // task(stageId=1, stageAttemptId=0, partitionId=1) finished + // ideally we would verify that but no way to get into task scheduler to verify + + // Both tasks in rddB should be resubmitted, because none of them has succeeded truly. // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully. // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt // is still running. @@ -2404,19 +2408,21 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) - // There should be no new attempt of stage submitted, - // because task(stageId=1, stageAttempt=1, partitionId=1) is still running in - // the current attempt (and hasn't completed successfully in any earlier attempts). - assert(taskSets.size === 4) + // At this point there should be no active task set for stageId=1 and we need + // to resubmit because the output from (stageId=1, stageAttemptId=0, partitionId=1) + // was ignored due to executor failure + assert(taskSets.size === 5) + assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2 + && taskSets(4).tasks.size === 1) - // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully. + // Complete task(stageId=1, stageAttempt=2, partitionId=1) successfully. runEvent(makeCompletionEvent( - taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) + taskSets(4).tasks(0), Success, makeMapStatus("hostB", 2))) // Now the ResultStage should be submitted, because all of the tasks of rddB have // completed successfully on alive executors. - assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) - complete(taskSets(4), Seq( + assert(taskSets.size === 6 && taskSets(5).tasks(0).isInstanceOf[ResultTask[_, _]]) + complete(taskSets(5), Seq( (Success, 1), (Success, 1))) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org