[jira] [Assigned] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors
[ https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan reassigned SPARK-41192: --- Assignee: Yazhi Wang > Task finished before speculative task scheduled leads to holding idle > executors > --- > > Key: SPARK-41192 > URL: https://issues.apache.org/jira/browse/SPARK-41192 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.2, 3.3.1 >Reporter: Yazhi Wang >Assignee: Yazhi Wang >Priority: Minor > Labels: dynamic_allocation > Attachments: dynamic-executors, dynamic-log > > > When task finished before speculative task has been scheduled by > DAGScheduler, then the speculative tasks will be considered as pending and > count towards the calculation of number of needed executors, which will lead > to request more executors than needed > h2. Background & Reproduce > In one of our production job, we found that ExecutorAllocationManager was > holding more executors than needed. > We found it's difficult to reproduce in the test environment. In order to > stably reproduce and debug, we temporarily annotated the scheduling code of > speculative tasks in TaskSetManager:363 to ensure that the task be completed > before the speculative task being scheduled. > {code:java} > // Original code > private def dequeueTask( > execId: String, > host: String, > maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, > Boolean)] = { > // Tries to schedule a regular task first; if it returns None, then > schedules > // a speculative task > dequeueTaskHelper(execId, host, maxLocality, false).orElse( > dequeueTaskHelper(execId, host, maxLocality, true)) > } > // Speculative task will never be scheduled > private def dequeueTask( > execId: String, > host: String, > maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, > Boolean)] = { > // Tries to schedule a regular task first; if it returns None, then > schedules > // a speculative task > dequeueTaskHelper(execId, host, maxLocality, false) > } {code} > Referring to examples in SPARK-30511 > You will see when running the last task, we would be hold 38 executors (see > attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only > 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed. > {code:java} > ./bin/spark-shell --master yarn --conf spark.speculation=true --conf > spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf > spark.dynamicAllocation.minExecutors=20 --conf > spark.dynamicAllocation.maxExecutors=1000 {code} > {code:java} > val n = 4000 > val someRDD = sc.parallelize(1 to n, n) > someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => { > if (index > 3998) { > Thread.sleep(1000 * 1000) > } else if (index > 3850) { > Thread.sleep(50 * 1000) // Fake running tasks > } else { > Thread.sleep(100) > } > Array.fill[Int](1)(1).iterator{code} > > I will have a PR ready to fix this issue -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors
[ https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41192: Assignee: (was: Apache Spark) > Task finished before speculative task scheduled leads to holding idle > executors > --- > > Key: SPARK-41192 > URL: https://issues.apache.org/jira/browse/SPARK-41192 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.2, 3.3.1 >Reporter: Yazhi Wang >Priority: Minor > Labels: dynamic_allocation > Attachments: dynamic-executors, dynamic-log > > > When task finished before speculative task has been scheduled by > DAGScheduler, then the speculative tasks will be considered as pending and > count towards the calculation of number of needed executors, which will lead > to request more executors than needed > h2. Background & Reproduce > In one of our production job, we found that ExecutorAllocationManager was > holding more executors than needed. > We found it's difficult to reproduce in the test environment. In order to > stably reproduce and debug, we temporarily annotated the scheduling code of > speculative tasks in TaskSetManager:363 to ensure that the task be completed > before the speculative task being scheduled. > {code:java} > // Original code > private def dequeueTask( > execId: String, > host: String, > maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, > Boolean)] = { > // Tries to schedule a regular task first; if it returns None, then > schedules > // a speculative task > dequeueTaskHelper(execId, host, maxLocality, false).orElse( > dequeueTaskHelper(execId, host, maxLocality, true)) > } > // Speculative task will never be scheduled > private def dequeueTask( > execId: String, > host: String, > maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, > Boolean)] = { > // Tries to schedule a regular task first; if it returns None, then > schedules > // a speculative task > dequeueTaskHelper(execId, host, maxLocality, false) > } {code} > Referring to examples in SPARK-30511 > You will see when running the last task, we would be hold 38 executors (see > attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only > 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed. > {code:java} > ./bin/spark-shell --master yarn --conf spark.speculation=true --conf > spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf > spark.dynamicAllocation.minExecutors=20 --conf > spark.dynamicAllocation.maxExecutors=1000 {code} > {code:java} > val n = 4000 > val someRDD = sc.parallelize(1 to n, n) > someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => { > if (index > 3998) { > Thread.sleep(1000 * 1000) > } else if (index > 3850) { > Thread.sleep(50 * 1000) // Fake running tasks > } else { > Thread.sleep(100) > } > Array.fill[Int](1)(1).iterator{code} > > I will have a PR ready to fix this issue -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors
[ https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41192: Assignee: Apache Spark > Task finished before speculative task scheduled leads to holding idle > executors > --- > > Key: SPARK-41192 > URL: https://issues.apache.org/jira/browse/SPARK-41192 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.2, 3.3.1 >Reporter: Yazhi Wang >Assignee: Apache Spark >Priority: Minor > Labels: dynamic_allocation > Attachments: dynamic-executors, dynamic-log > > > When task finished before speculative task has been scheduled by > DAGScheduler, then the speculative tasks will be considered as pending and > count towards the calculation of number of needed executors, which will lead > to request more executors than needed > h2. Background & Reproduce > In one of our production job, we found that ExecutorAllocationManager was > holding more executors than needed. > We found it's difficult to reproduce in the test environment. In order to > stably reproduce and debug, we temporarily annotated the scheduling code of > speculative tasks in TaskSetManager:363 to ensure that the task be completed > before the speculative task being scheduled. > {code:java} > // Original code > private def dequeueTask( > execId: String, > host: String, > maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, > Boolean)] = { > // Tries to schedule a regular task first; if it returns None, then > schedules > // a speculative task > dequeueTaskHelper(execId, host, maxLocality, false).orElse( > dequeueTaskHelper(execId, host, maxLocality, true)) > } > // Speculative task will never be scheduled > private def dequeueTask( > execId: String, > host: String, > maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, > Boolean)] = { > // Tries to schedule a regular task first; if it returns None, then > schedules > // a speculative task > dequeueTaskHelper(execId, host, maxLocality, false) > } {code} > Referring to examples in SPARK-30511 > You will see when running the last task, we would be hold 38 executors (see > attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only > 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed. > {code:java} > ./bin/spark-shell --master yarn --conf spark.speculation=true --conf > spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf > spark.dynamicAllocation.minExecutors=20 --conf > spark.dynamicAllocation.maxExecutors=1000 {code} > {code:java} > val n = 4000 > val someRDD = sc.parallelize(1 to n, n) > someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => { > if (index > 3998) { > Thread.sleep(1000 * 1000) > } else if (index > 3850) { > Thread.sleep(50 * 1000) // Fake running tasks > } else { > Thread.sleep(100) > } > Array.fill[Int](1)(1).iterator{code} > > I will have a PR ready to fix this issue -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org