[ 
https://issues.apache.org/jira/browse/SPARK-24909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-24909:
----------------------------------
    Description: 
The DAGScheduler can hang if the executor was lost (due to fetch failure) and 
all the tasks in the tasks sets are marked as completed. 
([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1265)]

It never creates new task attempts in the task scheduler but the dag scheduler 
still has pendingPartitions.
{code:java}
8/07/22 08:30:00 INFO scheduler.TaskSetManager: Starting task 55769.0 in stage 
44.0 (TID 970752, host1.com, executor 33, partition 55769, PROCESS_LOCAL, 7874 
bytes)

18/07/22 08:30:29 INFO scheduler.DAGScheduler: Marking ShuffleMapStage 44 
(repartition at Lift.scala:191) as failed due to a fetch failure from 
ShuffleMapStage 42 (map at foo.scala:27)
18/07/22 08:30:29 INFO scheduler.DAGScheduler: Resubmitting ShuffleMapStage 42 
(map at foo.scala:27) and ShuffleMapStage 44 (repartition at bar.scala:191) due 
to fetch failure
....

18/07/22 08:30:56 INFO scheduler.DAGScheduler: Executor lost: 33 (epoch 18)
18/07/22 08:30:56 INFO schedulerDAGScheduler: Shuffle files lost for executor: 
33 (epoch 18)

18/07/22 08:31:20 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 44 
(MapPartitionsRDD[70] at repartition at bar.scala:191), which has no missing 
parents
18/07/22 08:31:21 INFO cluster.YarnClusterScheduler: Adding task set 44.1 with 
59955 tasks

18/07/22 08:31:41 INFO scheduler.TaskSetManager: Finished task 55769.0 in stage 
44.0 (TID 970752) in 101505 ms on host1.com (executor 33) (15081/73320)

8/07/22 08:31:41 INFO scheduler.DAGScheduler: Ignoring possibly bogus 
ShuffleMapTask(44, 55769) completion from executor 33{code}
 

In the logs above you will see that task 55769.0 finished after the executor 
was lost and a new task set was started.  The DAG scheduler says "Ignoring 
possibly bogus".. but in the TaskSetManager side it has marked those tasks as 
completed for all stage attempts. The DAGScheduler gets hung here.  I did a 
heap dump on the process and can see that 55769 is still in the DAGScheduler 
pendingPartitions list but the tasksetmanagers are all complete

Note to reproduce this, you need a situation where  you have a shufflemaptask 
(call it task1) fetching data from an executor where it also has other 
shufflemaptasks (call it task2) running (fetch from other hosts). the task1 
fetching the data has to FetchFail which would cause the stage to fail and the 
executor to be marked as lost due to the fetch failure.  It restarts a new task 
set for the new stage attempt, then the shufflemaptask task2 that was running 
on the executor that was marked Lost finished.  The scheduler ignore that 
complete event  "Ignoring possible bogus ...". This results in a hang because 
at this point the TaskSetManager has already marked all tasks for all attempts 
of that stage as completed.

 

Configs needed to be on:
|{{spark.blacklist.application.fetchFailure.enabled=true}}| |
|{{spark.blacklist.application.fetchFailure.enabled=true}}|

spark.files.fetchFailure.unRegisterOutputOnHost=true

spark.shuffle.service.enabled=true

  was:
The DAGScheduler can hang if the executor was lost (due to fetch failure) and 
all the tasks in the tasks sets are marked as completed. 
([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1265)]

It never creates new task attempts in the task scheduler but the dag scheduler 
still has pendingPartitions.
{code:java}
8/07/22 08:30:00 INFO scheduler.TaskSetManager: Starting task 55769.0 in stage 
44.0 (TID 970752, host1.com, executor 33, partition 55769, PROCESS_LOCAL, 7874 
bytes)

18/07/22 08:30:29 INFO scheduler.DAGScheduler: Marking ShuffleMapStage 44 
(repartition at Lift.scala:191) as failed due to a fetch failure from 
ShuffleMapStage 42 (map at foo.scala:27)
18/07/22 08:30:29 INFO scheduler.DAGScheduler: Resubmitting ShuffleMapStage 42 
(map at foo.scala:27) and ShuffleMapStage 44 (repartition at bar.scala:191) due 
to fetch failure
....

18/07/22 08:30:56 INFO scheduler.DAGScheduler: Executor lost: 33 (epoch 18)
18/07/22 08:30:56 INFO schedulerDAGScheduler: Shuffle files lost for executor: 
33 (epoch 18)

18/07/22 08:31:20 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 44 
(MapPartitionsRDD[70] at repartition at bar.scala:191), which has no missing 
parents
18/07/22 08:31:21 INFO cluster.YarnClusterScheduler: Adding task set 44.1 with 
59955 tasks

18/07/22 08:31:41 INFO scheduler.TaskSetManager: Finished task 55769.0 in stage 
44.0 (TID 970752) in 101505 ms on host1.com (executor 33) (15081/73320)

8/07/22 08:31:41 INFO scheduler.DAGScheduler: Ignoring possibly bogus 
ShuffleMapTask(44, 55769) completion from executor 33{code}
 

In the logs above you will see that task 55769.0 finished after the executor 
was lost and a new task set was started.  The DAG scheduler says "Ignoring 
possibly bogus".. but in the TaskSetManager side it has marked those tasks as 
completed for all stage attempts. The DAGScheduler gets hung here.  I did a 
heap dump on the process and can see that 55769 is still in the DAGScheduler 
pendingPartitions list but the tasksetmanagers are all complete

Note to reproduce this, you need a situation where  you have a shufflemaptask 
(call it task1) fetching data from an executor where it also has other 
shufflemaptasks (call it task2) running (fetch from other hosts). the task1 
fetching the data has to FetchFail which would cause the stage to fail and the 
executor to be marked as lost due to the fetch failure.  It restarts a new task 
set for the new stage attempt, then the shufflemaptask task2 that was running 
on the executor that was marked Lost finished.  The scheduler ignore that 
complete event  "Ignoring possible bogus ...". This results in a hang because 
at this point the TaskSetManager has already marked all tasks for all attempts 
of that stage as completed.
|{{spark.blacklist.application.fetchFailure.enabled=true}}| |

|{{spark.blacklist.application.fetchFailure.enabled=true}}|

spark.files.fetchFailure.unRegisterOutputOnHost=true


> Spark scheduler can hang when fetch failures, executor lost, task running on 
> lost executor, and multiple stage attempts
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24909
>                 URL: https://issues.apache.org/jira/browse/SPARK-24909
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.3.1
>            Reporter: Thomas Graves
>            Priority: Critical
>
> The DAGScheduler can hang if the executor was lost (due to fetch failure) and 
> all the tasks in the tasks sets are marked as completed. 
> ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1265)]
> It never creates new task attempts in the task scheduler but the dag 
> scheduler still has pendingPartitions.
> {code:java}
> 8/07/22 08:30:00 INFO scheduler.TaskSetManager: Starting task 55769.0 in 
> stage 44.0 (TID 970752, host1.com, executor 33, partition 55769, 
> PROCESS_LOCAL, 7874 bytes)
> 18/07/22 08:30:29 INFO scheduler.DAGScheduler: Marking ShuffleMapStage 44 
> (repartition at Lift.scala:191) as failed due to a fetch failure from 
> ShuffleMapStage 42 (map at foo.scala:27)
> 18/07/22 08:30:29 INFO scheduler.DAGScheduler: Resubmitting ShuffleMapStage 
> 42 (map at foo.scala:27) and ShuffleMapStage 44 (repartition at 
> bar.scala:191) due to fetch failure
> ....
> 18/07/22 08:30:56 INFO scheduler.DAGScheduler: Executor lost: 33 (epoch 18)
> 18/07/22 08:30:56 INFO schedulerDAGScheduler: Shuffle files lost for 
> executor: 33 (epoch 18)
> 18/07/22 08:31:20 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 44 
> (MapPartitionsRDD[70] at repartition at bar.scala:191), which has no missing 
> parents
> 18/07/22 08:31:21 INFO cluster.YarnClusterScheduler: Adding task set 44.1 
> with 59955 tasks
> 18/07/22 08:31:41 INFO scheduler.TaskSetManager: Finished task 55769.0 in 
> stage 44.0 (TID 970752) in 101505 ms on host1.com (executor 33) (15081/73320)
> 8/07/22 08:31:41 INFO scheduler.DAGScheduler: Ignoring possibly bogus 
> ShuffleMapTask(44, 55769) completion from executor 33{code}
>  
> In the logs above you will see that task 55769.0 finished after the executor 
> was lost and a new task set was started.  The DAG scheduler says "Ignoring 
> possibly bogus".. but in the TaskSetManager side it has marked those tasks as 
> completed for all stage attempts. The DAGScheduler gets hung here.  I did a 
> heap dump on the process and can see that 55769 is still in the DAGScheduler 
> pendingPartitions list but the tasksetmanagers are all complete
> Note to reproduce this, you need a situation where  you have a shufflemaptask 
> (call it task1) fetching data from an executor where it also has other 
> shufflemaptasks (call it task2) running (fetch from other hosts). the task1 
> fetching the data has to FetchFail which would cause the stage to fail and 
> the executor to be marked as lost due to the fetch failure.  It restarts a 
> new task set for the new stage attempt, then the shufflemaptask task2 that 
> was running on the executor that was marked Lost finished.  The scheduler 
> ignore that complete event  "Ignoring possible bogus ...". This results in a 
> hang because at this point the TaskSetManager has already marked all tasks 
> for all attempts of that stage as completed.
>  
> Configs needed to be on:
> |{{spark.blacklist.application.fetchFailure.enabled=true}}| |
> |{{spark.blacklist.application.fetchFailure.enabled=true}}|
> spark.files.fetchFailure.unRegisterOutputOnHost=true
> spark.shuffle.service.enabled=true



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to