[ 
https://issues.apache.org/jira/browse/SPARK-10796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278124#comment-15278124
 ] 

SuYan commented on SPARK-10796:
-------------------------------


main changes:
1. make DAGScheuler only receive Task Resubmit events from ActiveTaskSets, so 
it can compare pendingPartitions with ShuffleMapStage missing outputs to know 
whether there have some partition cannot compute according current Tasksets, 
and make a decision if there is a need to resubmit ShuffleMapStage

other changes:
1. not register running stage outputlocs while executor lost
2. Make stage's tasksets as zombie while marked as finished
3. ignore expired task's partition output loc
4. add make taskSetManager not handle failedTask again, which task already mark 
as failed due to executor lost

> The Stage taskSets may are all removed while stage still have pending 
> partitions after having lost some executors
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-10796
>                 URL: https://issues.apache.org/jira/browse/SPARK-10796
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.3.0, 1.4.0, 1.5.0
>            Reporter: SuYan
>            Priority: Minor
>
> desc:
> 1. We know a running ShuffleMapStage will have multiple TaskSet: one Active 
> TaskSet, multiple Zombie TaskSet, and mutiple removedTaskSet
> We think a running ShuffleMapStage is success only if its partition are all 
> process success, namely each task‘s MapStatus are all add into outputLocs
> MapStatus of running ShuffleMapStage may succeed by RemovedTaskSet1../Zombie 
> TaskSet1 / Zombie TaskSet2 /..../ Active TaskSetN. So it had a chance that 
> some output only hold by some RemovedTaskset or ZombieTaskSet.
> If lost a executor, it chanced that some lost-executor related MapStatus are 
> succeed by some Zombie TaskSet.
> In current logical, The solution to resolved that lost MapStatus problem is,
> each TaskSet re-running that those tasks which succeed in lost-executor: 
> re-add into TaskSet's pendingTasks, 
> and re-add it paritions into Stage‘s pendingPartitions .
> but it is useless if that lost MapStatus only belong to Zombie/Removed 
> TaskSet, it is Zombie, so will never be scheduled his pendingTasks
> The condition for resubmit stage is only if some task throws 
> FetchFailedException, but may the lost-executor just not empty any MapStatus 
> of parent Stage for one of running Stages, 
> and it‘s happen to that running Stage was lost a MapStatus only belong to a 
> ZombieTask or removedTaskset.
> So if all Zombie TaskSets are all processed his runningTasks and Active 
> TaskSet are all processed his pendingTask, then will removed by 
> TaskSchedulerImp, then that running Stage's pending partitions is still 
> nonEmpty. it will hangs......
> {code}
>  test("Resubmit stage while lost partition in ZombieTasksets or 
> RemovedTaskSets") {
>     val firstRDD = new MyRDD(sc, 3, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, new 
> HashPartitioner(3))
>     val firstShuffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(3))
>     val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
>     submit(reduceRdd, Array(0))
>     // things start out smoothly, stage 0 completes with no issues
>     complete(taskSets(0), Seq(
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
>     ))
>     // then start running stage 1
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(0),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     // simulate make stage 1 resubmit, notice for stage1.0
>     // partitionId=1 already finished in hostD, so if we resubmit stage1,
>     // stage 1.1 only resubmit tasks for partitionId = 0,2
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(1),
>       FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
>     scheduler.resubmitFailedStages()
>     val stage1Resubmit1 = taskSets(2)
>     assert(stage1Resubmit1.stageId == 1)
>     assert(stage1Resubmit1.tasks.size == 2)
>     // now exec-hostD lost, so the output loc of stage1 partitionId=1 will 
> lost.
>     runEvent(ExecutorLost("exec-hostD"))
>     runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
>     // let stage1Resubmit1 complete
>     complete(taskSets(2), Seq(
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
>     ))
>     // and let we complete tasksets1.0's active running Tasks
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(1),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(2),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     // Now all runningTasksets for stage1 was all completed. 
>     assert(scheduler.runningStages.head.pendingPartitions.head == 0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to