[ https://issues.apache.org/jira/browse/SPARK-19263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kay Ousterhout resolved SPARK-19263. ------------------------------------ Resolution: Fixed Assignee: jin xing Fix Version/s: 1.2.0 > DAGScheduler should avoid sending conflicting task set. > ------------------------------------------------------- > > Key: SPARK-19263 > URL: https://issues.apache.org/jira/browse/SPARK-19263 > Project: Spark > Issue Type: Bug > Components: Scheduler > Affects Versions: 2.1.0 > Reporter: jin xing > Assignee: jin xing > Fix For: 1.2.0 > > > In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is > *Success*, it will first do *stage.pendingPartitions -= task.partitionId*, > which maybe a bug when *FetchFailed* happens. Think about below: > # Stage 0 runs and generates shuffle output data. > # Stage 1 reads the output from stage 0 and generates more shuffle data. It > has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are > launched on executorA. > # ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to > the driver. The driver marks executorA as lost and updates failedEpoch; > # The driver resubmits stage 0 so the missing output can be re-generated, and > then once it completes, resubmits stage 1 with ShuffleMapTask1x and > ShuffleMapTask2x. > # ShuffleMapTask2 (from the original attempt of stage 1) successfully > finishes on executorA and sends Success back to driver. This causes > DAGScheduler::handleTaskCompletion to remove partition 2 from > stage.pendingPartitions (line 1149), but it does not add the partition to the > set of output locations (line 1192), because the task’s epoch is less than > the failure epoch for the executor (because of the earlier failure on > executor A) > # ShuffleMapTask1x successfully finishes on executorB, causing the driver to > remove partition 1 from stage.pendingPartitions. Combined with the previous > step, this means that there are no more pending partitions for the stage, so > the DAGScheduler marks the stage as finished (line 1196). However, the > shuffle stage is not available (line 1215) because the completion for > ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler > resubmits the stage. > # ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks > is called for the re-submitted stage, it throws an error, because there’s an > existing active task set > To reproduce the bug: > 1. We need to do some modification in *ShuffleBlockFetcherIterator*: check > whether the task's index in *TaskSetManager* and stage attempt equal to 0 at > the same time, if so, throw FetchFailedException; > 2. Rebuild spark then submit following job: > {code} > val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, > 3), (2, 1), (3, 1)), 2) > rdd.reduceByKey { > (v1, v2) => { > Thread.sleep(10000) > v1 + v2 > } > }.map { > keyAndValue => { > (keyAndValue._1 % 2, keyAndValue._2) > } > }.reduceByKey { > (v1, v2) => { > Thread.sleep(10000) > v1 + v2 > } > }.collect > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org