[jira] [Updated] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors

2022-11-17 Thread Yazhi Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yazhi Wang updated SPARK-41192:
---
Description: 
When task finished before speculative task has been scheduled by DAGScheduler, 
then the speculative tasks will be considered as pending and count towards the 
calculation of number of needed executors, which will lead to request more 
executors than needed
h2. Background & Reproduce

In one of our production job, we found that ExecutorAllocationManager was 
holding more executors than needed. 

We found it's difficult to reproduce in the test environment. In order to 
stably reproduce and debug, we temporarily annotated the scheduling code of 
speculative tasks in TaskSetManager:363 to ensure that the task be completed 
before the speculative task being scheduled.
{code:java}
// Original code
private def dequeueTask(
    execId: String,
    host: String,
    maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
Boolean)] = {
  // Tries to schedule a regular task first; if it returns None, then schedules
  // a speculative task
  dequeueTaskHelper(execId, host, maxLocality, false).orElse(
    dequeueTaskHelper(execId, host, maxLocality, true))
} 
// Speculative task will never be scheduled
private def dequeueTask(
    execId: String,
    host: String,
    maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
Boolean)] = {
  // Tries to schedule a regular task first; if it returns None, then schedules
  // a speculative task
  dequeueTaskHelper(execId, host, maxLocality, false)
}  {code}
Referring to examples in SPARK-30511

You will see when running the last task, we would be hold 38 executors (see 
attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 2 
tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
{code:java}
./bin/spark-shell --master yarn --conf spark.speculation=true --conf 
spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf 
spark.dynamicAllocation.minExecutors=20 --conf 
spark.dynamicAllocation.maxExecutors=1000 {code}
{code:java}
val n = 4000
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index > 3998) {
    Thread.sleep(1000 * 1000)
} else if (index > 3850) {
    Thread.sleep(50 * 1000) // Fake running tasks
} else {
    Thread.sleep(100)
}
Array.fill[Int](1)(1).iterator{code}
 

I will have a PR ready to fix this issue

  was:
When task finished before speculative task has been scheduled by DAGScheduler, 
then the speculative tasks will be considered as pending and count towards the 
calculation of number of needed executors, which will lead to request more 
executors than needed
h2. Background & Reproduce

In one of our production job, we found that ExecutorAllocationManager was 
holding more executors than needed. 

We found it's difficult to reproduce in the test environment. In order to 
stably reproduce and debug, we temporarily annotated the scheduling code of 
speculative tasks in TaskSetManager:363 to ensure that the task be completed 
before the speculative task being scheduled.
{code:java}
// Original code
private def dequeueTask(
    execId: String,
    host: String,
    maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
Boolean)] = {
  // Tries to schedule a regular task first; if it returns None, then schedules
  // a speculative task
  dequeueTaskHelper(execId, host, maxLocality, false).orElse(
    dequeueTaskHelper(execId, host, maxLocality, true))
} 
// Speculative task will never be scheduled
private def dequeueTask(
    execId: String,
    host: String,
    maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
Boolean)] = {
  // Tries to schedule a regular task first; if it returns None, then schedules
  // a speculative task
  dequeueTaskHelper(execId, host, maxLocality, false)
}  {code}
Referring to examples in SPARK-30511

You will see when running the last task, we would be hold 38 executors (see 
attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 2 
tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
{code:java}
./bin/spark-shell --master yarn --conf spark.speculation=true --conf 
spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf 
spark.dynamicAllocation.minExecutors=20 --conf 
spark.dynamicAllocation.maxExecutors=1000 {code}
{code:java}
val n = 4000
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index > 3998) {
    Thread.sleep(1000 * 1000)
} else if (index > 3850) {
    Thread.sleep(20 * 1000) // Fake running tasks
} else {
    Thread.sleep(100)
}
Array.fill[Int](1)(1).iterator{code}
 

I will have a PR ready to fix this issue


> Task finished before speculative task scheduled leads to holding idle 
> executors
> 

[jira] [Updated] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors

2022-11-17 Thread Yazhi Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yazhi Wang updated SPARK-41192:
---
Attachment: dynamic-executors

> Task finished before speculative task scheduled leads to holding idle 
> executors
> ---
>
> Key: SPARK-41192
> URL: https://issues.apache.org/jira/browse/SPARK-41192
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.2, 3.3.1
>Reporter: Yazhi Wang
>Priority: Minor
>  Labels: dynamic_allocation
> Attachments: dynamic-executors, dynamic-log
>
>
> When task finished before speculative task has been scheduled by 
> DAGScheduler, then the speculative tasks will be considered as pending and 
> count towards the calculation of number of needed executors, which will lead 
> to request more executors than needed
> h2. Background & Reproduce
> In one of our production job, we found that ExecutorAllocationManager was 
> holding more executors than needed. 
> We found it's difficult to reproduce in the test environment. In order to 
> stably reproduce and debug, we temporarily annotated the scheduling code of 
> speculative tasks in TaskSetManager:363 to ensure that the task be completed 
> before the speculative task being scheduled.
> {code:java}
> // Original code
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false).orElse(
>     dequeueTaskHelper(execId, host, maxLocality, true))
> } 
> // Speculative task will never be scheduled
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false)
> }  {code}
> Referring to examples in SPARK-30511
> You will see when running the last task, we would be hold 38 executors (see 
> attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 
> 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
> {code:java}
> ./bin/spark-shell --master yarn --conf spark.speculation=true --conf 
> spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf 
> spark.dynamicAllocation.minExecutors=20 --conf 
> spark.dynamicAllocation.maxExecutors=1000 {code}
> {code:java}
> val n = 4000
> val someRDD = sc.parallelize(1 to n, n)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index > 3998) {
>     Thread.sleep(1000 * 1000)
> } else if (index > 3850) {
>     Thread.sleep(20 * 1000) // Fake running tasks
> } else {
>     Thread.sleep(100)
> }
> Array.fill[Int](1)(1).iterator{code}
>  
> I will have a PR ready to fix this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors

2022-11-17 Thread Yazhi Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yazhi Wang updated SPARK-41192:
---
Attachment: dynamic-log

> Task finished before speculative task scheduled leads to holding idle 
> executors
> ---
>
> Key: SPARK-41192
> URL: https://issues.apache.org/jira/browse/SPARK-41192
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.2, 3.3.1
>Reporter: Yazhi Wang
>Priority: Minor
>  Labels: dynamic_allocation
> Attachments: dynamic-executors, dynamic-log
>
>
> When task finished before speculative task has been scheduled by 
> DAGScheduler, then the speculative tasks will be considered as pending and 
> count towards the calculation of number of needed executors, which will lead 
> to request more executors than needed
> h2. Background & Reproduce
> In one of our production job, we found that ExecutorAllocationManager was 
> holding more executors than needed. 
> We found it's difficult to reproduce in the test environment. In order to 
> stably reproduce and debug, we temporarily annotated the scheduling code of 
> speculative tasks in TaskSetManager:363 to ensure that the task be completed 
> before the speculative task being scheduled.
> {code:java}
> // Original code
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false).orElse(
>     dequeueTaskHelper(execId, host, maxLocality, true))
> } 
> // Speculative task will never be scheduled
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false)
> }  {code}
> Referring to examples in SPARK-30511
> You will see when running the last task, we would be hold 38 executors (see 
> attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 
> 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
> {code:java}
> ./bin/spark-shell --master yarn --conf spark.speculation=true --conf 
> spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf 
> spark.dynamicAllocation.minExecutors=20 --conf 
> spark.dynamicAllocation.maxExecutors=1000 {code}
> {code:java}
> val n = 4000
> val someRDD = sc.parallelize(1 to n, n)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index > 3998) {
>     Thread.sleep(1000 * 1000)
> } else if (index > 3850) {
>     Thread.sleep(20 * 1000) // Fake running tasks
> } else {
>     Thread.sleep(100)
> }
> Array.fill[Int](1)(1).iterator{code}
>  
> I will have a PR ready to fix this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org