[ 
https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635763#comment-17635763
 ] 

Apache Spark commented on SPARK-41192:
--------------------------------------

User 'toujours33' has created a pull request for this issue:
https://github.com/apache/spark/pull/38711

> Task finished before speculative task scheduled leads to holding idle 
> executors
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-41192
>                 URL: https://issues.apache.org/jira/browse/SPARK-41192
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.2, 3.3.1
>            Reporter: Yazhi Wang
>            Priority: Minor
>              Labels: dynamic_allocation
>         Attachments: dynamic-executors, dynamic-log
>
>
> When task finished before speculative task has been scheduled by 
> DAGScheduler, then the speculative tasks will be considered as pending and 
> count towards the calculation of number of needed executors, which will lead 
> to request more executors than needed
> h2. Background & Reproduce
> In one of our production job, we found that ExecutorAllocationManager was 
> holding more executors than needed. 
> We found it's difficult to reproduce in the test environment. In order to 
> stably reproduce and debug, we temporarily annotated the scheduling code of 
> speculative tasks in TaskSetManager:363 to ensure that the task be completed 
> before the speculative task being scheduled.
> {code:java}
> // Original code
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false).orElse(
>     dequeueTaskHelper(execId, host, maxLocality, true))
> } 
> // Speculative task will never be scheduled
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false)
> }  {code}
> Referring to examples in SPARK-30511
> You will see when running the last task, we would be hold 38 executors (see 
> attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 
> 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
> {code:java}
> ./bin/spark-shell --master yarn --conf spark.speculation=true --conf 
> spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf 
> spark.dynamicAllocation.minExecutors=20 --conf 
> spark.dynamicAllocation.maxExecutors=1000 {code}
> {code:java}
> val n = 4000
> val someRDD = sc.parallelize(1 to n, n)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index > 3998) {
>     Thread.sleep(1000 * 1000)
> } else if (index > 3850) {
>     Thread.sleep(50 * 1000) // Fake running tasks
> } else {
>     Thread.sleep(100)
> }
> Array.fill[Int](1)(1).iterator{code}
>  
> I will have a PR ready to fix this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to