[ 
https://issues.apache.org/jira/browse/SPARK-19263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kay Ousterhout resolved SPARK-19263.
------------------------------------
       Resolution: Fixed
         Assignee: jin xing
    Fix Version/s: 1.2.0

> DAGScheduler should avoid sending conflicting task set.
> -------------------------------------------------------
>
>                 Key: SPARK-19263
>                 URL: https://issues.apache.org/jira/browse/SPARK-19263
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: jin xing
>            Assignee: jin xing
>             Fix For: 1.2.0
>
>
> In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is 
> *Success*, it will first do *stage.pendingPartitions -= task.partitionId*, 
> which maybe a bug when *FetchFailed* happens. Think about below:
> # Stage 0 runs and generates shuffle output data.
> # Stage 1 reads the output from stage 0 and generates more shuffle data. It 
> has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are 
> launched on executorA.
> # ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to 
> the driver. The driver marks executorA as lost and updates failedEpoch;
> # The driver resubmits stage 0 so the missing output can be re-generated, and 
> then once it completes, resubmits stage 1 with ShuffleMapTask1x and 
> ShuffleMapTask2x.
> # ShuffleMapTask2 (from the original attempt of stage 1) successfully 
> finishes on executorA and sends Success back to driver. This causes 
> DAGScheduler::handleTaskCompletion to remove partition 2 from 
> stage.pendingPartitions (line 1149), but it does not add the partition to the 
> set of output locations (line 1192), because the task’s epoch is less than 
> the failure epoch for the executor (because of the earlier failure on 
> executor A)
> # ShuffleMapTask1x successfully finishes on executorB, causing the driver to 
> remove partition 1 from stage.pendingPartitions. Combined with the previous 
> step, this means that there are no more pending partitions for the stage, so 
> the DAGScheduler marks the stage as finished (line 1196). However, the 
> shuffle stage is not available (line 1215) because the completion for 
> ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler 
> resubmits the stage.
> # ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks 
> is called for the re-submitted stage, it throws an error, because there’s an 
> existing active task set
> To reproduce the bug:
> 1. We need to do some modification in *ShuffleBlockFetcherIterator*: check 
> whether the task's index in *TaskSetManager* and stage attempt equal to 0 at 
> the same time, if so, throw FetchFailedException;
> 2. Rebuild spark then submit following job:
> {code}
>     val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 
> 3), (2, 1), (3, 1)), 2)
>     rdd.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.map {
>       keyAndValue => {
>         (keyAndValue._1 % 2, keyAndValue._2)
>       }
>     }.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to