[ 
https://issues.apache.org/jira/browse/SPARK-23365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255799#comment-17255799
 ] 

Apache Spark commented on SPARK-23365:
--------------------------------------

User 'Ngone51' has created a pull request for this issue:
https://github.com/apache/spark/pull/30956

> DynamicAllocation with failure in straggler task can lead to a hung spark job
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-23365
>                 URL: https://issues.apache.org/jira/browse/SPARK-23365
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, Spark Core
>    Affects Versions: 2.1.2, 2.2.1, 2.3.0
>            Reporter: Imran Rashid
>            Assignee: Imran Rashid
>            Priority: Major
>             Fix For: 2.3.1, 2.4.0
>
>
> Dynamic Allocation can lead to a spark app getting stuck with 0 executors 
> requested when the executors in the last tasks of a taskset fail (eg. with an 
> OOM).
> This happens when {{ExecutorAllocationManager}} s internal target number of 
> executors gets out of sync with {{CoarseGrainedSchedulerBackend}} s target 
> number.  {{EAM}} updates the {{CGSB}} in two ways: (1) it tracks how many 
> tasks are active or pending in submitted stages, and computes how many 
> executors would be needed for them.  And as tasks finish, it will actively 
> decrease that count, informing the {{CGSB}} along the way.  (2) When it 
> decides executors are inactive for long enough, then it requests that 
> {{CGSB}} kill the executors -- this also tells the {{CGSB}} to update its 
> target number of executors: 
> https://github.com/apache/spark/blob/4df84c3f818aa536515729b442601e08c253ed35/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L622
> So when there is just one task left, you could have the following sequence of 
> events:
> (1) the {{EAM}} sets the desired number of executors to 1, and updates the 
> {{CGSB}} too
> (2) while that final task is still running, the other executors cross the 
> idle timeout, and the {{EAM}} requests the {{CGSB}} kill them
> (3) now the {{EAM}} has a target of 1 executor, and the {{CGSB}} has a target 
> of 0 executors
> If the final task completed normally now, everything would be OK; the next 
> taskset would get submitted, the {{EAM}} would increase the target number of 
> executors and it would update the {{CGSB}}.
> But if the executor for that final task failed (eg. an OOM), then the {{EAM}} 
> thinks it [doesn't need to update 
> anything|https://github.com/apache/spark/blob/4df84c3f818aa536515729b442601e08c253ed35/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L384-L386],
>  because its target is already 1, which is all it needs for that final task; 
> and the {{CGSB}} doesn't update anything either since its target is 0.
> I think you can determine if this is the cause of a stuck app by looking for
> {noformat}
> yarn.YarnAllocator: Driver requested a total number of 0 executor(s).
> {noformat}
> in the logs of the ApplicationMaster (at least on yarn).
> You can reproduce this with this test app, run with {{--conf 
> "spark.dynamicAllocation.minExecutors=1" --conf 
> "spark.dynamicAllocation.maxExecutors=5" --conf 
> "spark.dynamicAllocation.executorIdleTimeout=5s"}}
> {code}
> import org.apache.spark.SparkEnv
> sc.setLogLevel("INFO")
> sc.parallelize(1 to 10000, 1000).count()
> val execs = sc.parallelize(1 to 1000, 1000).map { _ => 
> SparkEnv.get.executorId}.collect().toSet
> val badExec = execs.head
> println("will kill exec " + badExec)
> sc.parallelize(1 to 5, 5).mapPartitions { itr =>
>   val exec = SparkEnv.get.executorId
>   if (exec == badExec) {
>     Thread.sleep(20000) // long enough that all the other tasks finish, and 
> the executors cross the idle timeout
>     // now cause the executor to oom
>     var buffers = Seq[Array[Byte]]()
>     while(true) {
>       buffers :+= new Array[Byte](1e8.toInt)
>     }
>     itr
>   } else {
>     itr
>   }
> }.collect()
> {code}
> *EDIT*: I adjusted the repro to cause an OOM on the bad executor, since 
> {{sc.killExecutor}} doesn't play nice with dynamic allocation in other ways.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to