[ https://issues.apache.org/jira/browse/SPARK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zebing Lin updated SPARK-30511: ------------------------------- Attachment: Screen Shot 2020-01-15 at 11.13.17.png > Spark marks ended speculative tasks as pending leads to holding idle executors > ------------------------------------------------------------------------------ > > Key: SPARK-30511 > URL: https://issues.apache.org/jira/browse/SPARK-30511 > Project: Spark > Issue Type: Bug > Components: Scheduler > Affects Versions: 2.3.0 > Reporter: Zebing Lin > Priority: Major > Attachments: Screen Shot 2020-01-15 at 11.13.17.png > > > *TL;DR* > When speculative tasks finished/failed/got killed, they are still considered > as pending and count towards the calculation of number of needed executors. > h3. Symptom > In one of our production job (where it's running 4 tasks per executor), we > found that it was holding 6 executors at the end with only 2 tasks running (1 > speculative). With more logging enabled, we found the job printed: > {code:java} > pendingTasks is 0 pendingSpeculativeTasks is 17 totalRunningTasks is 2 > {code} > while the job only had 1 speculative task running and 16 speculative tasks > intentionally killed because of corresponding original tasks had finished. > An easy repro of the issue (`--conf spark.speculation=true --conf > spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=1000` in > cluster mode): > {code:java} > val n = 4000 > val someRDD = sc.parallelize(1 to n, n) > someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => { > if (index < 300 && index >= 150) { > Thread.sleep(index * 1000) // Fake running tasks > } else if (index == 300) { > Thread.sleep(1000 * 1000) // Fake long running tasks > } > it.toList.map(x => index + ", " + x).iterator > }).collect > {code} > You will see when running the last task, we would be hold 39 executors (see > attachment). > h3. The Bug > Upon examining the code of _pendingSpeculativeTasks_: > {code:java} > stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) => > numTasks - > stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0) > }.sum > {code} > where _stageAttemptToNumSpeculativeTasks(stageAttempt)_ is incremented on > _onSpeculativeTaskSubmitted_, but never decremented. > _stageAttemptToNumSpeculativeTasks -= stageAttempt_ is performed on stage > completion. *This means Spark is marking ended speculative tasks as pending, > which leads to Spark to hold more executors that it actually needs!* > I will have a PR ready to fix this issue, along with SPARK-2840 too > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org