This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0aace99  [SPARK-30388][CORE] Mark running map stages of finished job 
as finished, and cancel running tasks
0aace99 is described below

commit 0aace99d1162348269848665725c7db2541807cc
Author: xuesenliang <xuesenli...@tencent.com>
AuthorDate: Tue Mar 3 09:29:43 2020 -0600

    [SPARK-30388][CORE] Mark running map stages of finished job as finished, 
and cancel running tasks
    
    ### What changes were proposed in this pull request?
    
    When a job finished, its running (re-submitted) map stages should be marked 
as finished if not used by other jobs. The running tasks of these stages are 
cancelled.
    
    And the ListenerBus should be notified too, otherwise, these map stage 
items will stay on the "Active Stages" page of web UI and never gone.
    
    For example:
    
    Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 
has two partitions, and its result stage 1 has two partitions too.
    
    **Steps to reproduce the bug:**
    1. map stage 0:    start task 0(```TID 0```) and task 1 (```TID 1```), then 
both finished successfully.
    2. result stage 1:  start task 0(```TID 2```) and task 1 (```TID 3```)
    3. result stage 1:  task 0(```TID 2```) finished successfully
    4. result stage 1:  speculative task 1.1(```TID 4```) launched, but then 
failed due to FetchFailedException.
    5. driver re-submits map stage 0 and result stage 1.
    6. map stage 0 (retry 1): task0(```TID 5```) launched
    7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 
finished.
    8. map stage 0 is removed from ```runningStages``` and 
```stageIdToStage```, because it doesn't belong to any job.
    ```
      private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: 
ActiveJob): HashSet[Stage] = {
       ...
          stageIdToStage.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
            case (stageId, stage) =>
                ...
                def removeStage(stageId: Int): Unit = {
                  for (stage <- stageIdToStage.get(stageId)) {
                    if (runningStages.contains(stage)) {
                      logDebug("Removing running stage %d".format(stageId))
                      runningStages -= stage
                    }
                    ...
                  }
                  stageIdToStage -= stageId
                }
    
                jobSet -= job.jobId
                if (jobSet.isEmpty) { // no other job needs this stage
                  removeStage(stageId)
                }
              }
      ...
      }
    
    ```
    9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 
0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished```
    ```
      private[scheduler] def DAGScheduler#handleTaskCompletion(event: 
CompletionEvent): Unit = {
        val task = event.task
        val stageId = task.stageId
        ...
        if (!stageIdToStage.contains(task.stageId)) {
          postTaskEnd(event)
          // Skip all the actions if the stage has been cancelled.
          return
        }
        ...
    ```
    
    #### Relevant spark driver logs as follows:
    
    ```
    20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at 
NativeMethodAccessorImpl.java:0) with 2 output partitions
    20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at 
NativeMethodAccessorImpl.java:0)
    20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 0)
    20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 
0)
    
    20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 
(MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no 
missing parents
    20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from 
ShuffleMapStage 0 (MapPartitionsRDD[2] at main at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 
1))
    20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 
tasks
    20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 
0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
    20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 
1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
    20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 
0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
    20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 
1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
    20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at 
NativeMethodAccessorImpl.java:0) finished in 40.854 s
    20/01/02 11:22:26 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose 
tasks have all completed, from pool
    
    20/01/02 11:22:26 INFO DAGScheduler: Submitting ResultStage 1 
(MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no 
missing parents
    20/01/02 11:22:26 INFO DAGScheduler: Submitting 2 missing tasks from 
ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0) 
(first 15 tasks are for partitions Vector(0, 1))
    20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 1.0 with 2 
tasks
    20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 
2, 9.179.143.4, executor 1, partition 0, NODE_LOCAL, 7929 bytes)
    20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 
3, 9.76.13.26, executor 2, partition 1, NODE_LOCAL, 7929 bytes)
    20/01/02 11:22:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 
2) in 79 ms on 9.179.143.4 (executor 1) (1/2)
    
    20/01/02 11:22:26 INFO TaskSetManager: Marking task 1 in stage 1.0 (on 
9.76.13.26) as speculatable because it ran more than 158 ms
    20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 
4, 9.179.143.52, executor 3, partition 1, ANY, 7929 bytes)
    20/01/02 11:22:26 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 
9.179.143.52, executor 3): FetchFailed(BlockManagerId(1, 9.179.143.4, 7337, 
None), shuffleId=0, mapId=0, reduceId=1, 
message=org.apache.spark.shuffle.FetchFailedException: Connection reset by peer)
    20/01/02 11:22:26 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) 
failed, but the task will not be re-executed (either because the task failed 
with a shuffle data fetch failure, so the previous stage needs to be re-run, or 
because a different copy of the task has already succeeded).
    20/01/02 11:22:26 INFO DAGScheduler: Marking ResultStage 1 (main at 
NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from 
ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0)
    20/01/02 11:22:26 INFO DAGScheduler: ResultStage 1 (main at 
NativeMethodAccessorImpl.java:0) failed in 0.261 s due to 
org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
    20/01/02 11:22:26 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main 
at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at 
NativeMethodAccessorImpl.java:0) due to fetch failure
    20/01/02 11:22:26 INFO DAGScheduler: Resubmitting failed stages
    
    20/01/02 11:22:26 INFO DAGScheduler: Submitting ShuffleMapStage 0 
(MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no 
missing parents
    20/01/02 11:22:26 INFO DAGScheduler: Submitting 1 missing tasks from 
ShuffleMapStage 0 (MapPartitionsRDD[2] at main at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
    20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 0.1 with 1 
tasks
    20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 
5, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
    
    // NOTE: Here should be "INFO TaskSetManager: Finished task 1.0 in stage 
1.0 (TID 3) in 10000 ms on 9.76.13.26 (executor 2) (2/2)"
    // and this bug is being fixed in 
https://issues.apache.org/jira/browse/SPARK-30404
    
    20/01/02 11:22:36 INFO TaskSetManager: Ignoring task-finished event for 1.0 
in stage 1.0 because task 1 has already completed successfully
    
    20/01/02 11:22:36 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose 
tasks have all completed, from pool
    20/01/02 11:22:36 INFO DAGScheduler: ResultStage 1 (main at 
NativeMethodAccessorImpl.java:0) finished in 10.131 s
    20/01/02 11:22:36 INFO DAGScheduler: Job 0 finished: main at 
NativeMethodAccessorImpl.java:0, took 51.031212 s
    
    20/01/02 11:22:58 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 
5) in 32029 ms on 9.179.143.4 (executor 1) (1/1)
    20/01/02 11:22:58 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose 
tasks have all completed, from pool
    ```
    
    ### Why are the changes needed?
    
    web UI is incorrect: ```stage 0 (retry 1)``` is finished, but it stays in 
```Active Stages``` Page.
    
    
![active_stage](https://user-images.githubusercontent.com/4401756/71656718-71185680-2d77-11ea-8dbc-fd8085ab3dfb.png)
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    A new test case is added.
    
    And test manually on cluster. The result is as follows:
    
![cancel_stage](https://user-images.githubusercontent.com/4401756/71658434-04a15580-2d7f-11ea-952b-dd8dd685f37d.png)
    
    Closes #27050 from liangxs/master.
    
    Authored-by: xuesenliang <xuesenli...@tencent.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
    (cherry picked from commit 7a4cf339d7082b576624940253e8283de9e83e19)
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 27 +++++++------
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 44 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7bf363d..ec6eff3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1431,6 +1431,7 @@ private[spark] class DAGScheduler(
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
                     markStageAsFinished(resultStage)
+                    cancelRunningIndependentStages(job, s"Job ${job.jobId} is 
finished.")
                     cleanupStateForJobAndIndependentStages(job)
                     try {
                       // killAllTaskAttempts will fail if a SchedulerBackend 
does not implement
@@ -1975,18 +1976,12 @@ private[spark] class DAGScheduler(
     }
   }
 
-  /** Fails a job and all stages that are only used by that job, and cleans up 
relevant state. */
-  private def failJobAndIndependentStages(
-      job: ActiveJob,
-      failureReason: String,
-      exception: Option[Throwable] = None): Unit = {
-    val error = new SparkException(failureReason, exception.orNull)
+  /** Cancel all independent, running stages that are only used by this job. */
+  private def cancelRunningIndependentStages(job: ActiveJob, reason: String): 
Boolean = {
     var ableToCancelStages = true
-
-    // Cancel all independent, running stages.
     val stages = jobIdToStageIds(job.jobId)
     if (stages.isEmpty) {
-      logError("No stages registered for job " + job.jobId)
+      logError(s"No stages registered for job ${job.jobId}")
     }
     stages.foreach { stageId =>
       val jobsForStage: Option[HashSet[Int]] = 
stageIdToStage.get(stageId).map(_.jobIds)
@@ -1998,12 +1993,12 @@ private[spark] class DAGScheduler(
         if (!stageIdToStage.contains(stageId)) {
           logError(s"Missing Stage for stage with id $stageId")
         } else {
-          // This is the only job that uses this stage, so fail the stage if 
it is running.
+          // This stage is only used by the job, so finish the stage if it is 
running.
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
               taskScheduler.cancelTasks(stageId, 
shouldInterruptTaskThread(job))
-              markStageAsFinished(stage, Some(failureReason))
+              markStageAsFinished(stage, Some(reason))
             } catch {
               case e: UnsupportedOperationException =>
                 logWarning(s"Could not cancel tasks for stage $stageId", e)
@@ -2013,11 +2008,19 @@ private[spark] class DAGScheduler(
         }
       }
     }
+    ableToCancelStages
+  }
 
-    if (ableToCancelStages) {
+  /** Fails a job and all stages that are only used by that job, and cleans up 
relevant state. */
+  private def failJobAndIndependentStages(
+      job: ActiveJob,
+      failureReason: String,
+      exception: Option[Throwable] = None): Unit = {
+    if (cancelRunningIndependentStages(job, failureReason)) {
       // SPARK-15783 important to cleanup state first, just for tests where we 
have some asserts
       // against the state.  Otherwise we have a *little* bit of flakiness in 
the tests.
       cleanupStateForJobAndIndependentStages(job)
+      val error = new SparkException(failureReason, exception.orNull)
       job.listener.jobFailed(error)
       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobFailed(error)))
     }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 101e60c..72a2e4c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1931,6 +1931,50 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("shuffle fetch failed on speculative task, but original task succeed 
(SPARK-30388)") {
+    var completedStage: List[Int] = Nil
+    val listener = new SparkListener() {
+      override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+        completedStage = completedStage :+ event.stageInfo.stageId
+      }
+    }
+    sc.addSparkListener(listener)
+
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduceRdd, Array(0, 1))
+    completeShuffleMapStageSuccessfully(0, 0, 2)
+    assert(completedStage === List(0))
+
+    // result task 0.0 succeed
+    runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, 42))
+    // speculative result task 1.1 fetch failed
+    val info = new TaskInfo(4, index = 1, attemptNumber = 1, 0L, "", "", 
TaskLocality.ANY, true)
+    runEvent(makeCompletionEvent(
+        taskSets(1).tasks(1),
+        FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0, 
1, "ignored"),
+        null,
+        Seq.empty,
+        Array.empty,
+        info
+      )
+    )
+    assert(completedStage === List(0, 1))
+
+    Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
+    // map stage resubmitted
+    assert(scheduler.runningStages.size === 1)
+    val mapStage = scheduler.runningStages.head
+    assert(mapStage.id === 0)
+    assert(mapStage.latestInfo.failureReason.isEmpty)
+
+    // original result task 1.0 succeed
+    runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, 42))
+    assert(completedStage === List(0, 1, 1, 0))
+    assert(scheduler.activeJobs.isEmpty)
+  }
+
   test("misbehaved accumulator should not crash DAGScheduler and 
SparkContext") {
     val acc = new LongAccumulator {
       override def add(v: java.lang.Long): Unit = throw new 
DAGSchedulerSuiteDummyException


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to