Ngone51 commented on code in PR #38711: URL: https://github.com/apache/spark/pull/38711#discussion_r1034375202
########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -810,9 +812,10 @@ private[spark] class ExecutorAllocationManager( val stageId = speculativeTask.stageId val stageAttemptId = speculativeTask.stageAttemptId val stageAttempt = StageAttempt(stageId, stageAttemptId) + val taskIndex = speculativeTask.taskIndex allocationManager.synchronized { - stageAttemptToNumSpeculativeTasks(stageAttempt) = - stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1 + stageAttemptToUnsubmittedSpeculativeTasks.getOrElseUpdate(stageAttempt, Review Comment: "Unsubmitted" here is confusing since the method name here is `onSpeculativeTaskSubmitted`.. how about `stageAttemptToPendingSpeculativeTasks`? ########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -775,16 +779,14 @@ private[spark] class ExecutorAllocationManager( } } if (taskEnd.taskInfo.speculative) { - stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}} - // If the previous task attempt succeeded first and it was the last task in a stage, - // the stage may have been removed before handing this speculative TaskEnd event. - if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) { - stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1 - } + stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach(_.remove(taskIndex)) } taskEnd.reason match { - case Success | _: TaskKilled => + case Success => + // remove speculative task for task finished. Review Comment: ```suggestion // Remove pending speculative task in case the normal task is finished before starting the speculative task ``` ########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager( // Should be 0 when no stages are active. private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int] private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] - // Number of speculative tasks pending/running in each stageAttempt - private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int] - // The speculative tasks started in each stageAttempt + // Number of speculative tasks running in each stageAttempt + // TODO(SPARK-14492): We simply need an Int for this. Review Comment: SPARK-41192? ########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager( // Should be 0 when no stages are active. private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int] private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] - // Number of speculative tasks pending/running in each stageAttempt - private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int] - // The speculative tasks started in each stageAttempt + // Number of speculative tasks running in each stageAttempt + // TODO(SPARK-14492): We simply need an Int for this. private val stageAttemptToSpeculativeTaskIndices = + new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]() + // Number of speculative tasks pending in each stageAttempt + private val stageAttemptToUnsubmittedSpeculativeTasks = Review Comment: How about renaming to `stageAttemptToPendingSpeculativeTasks` and renaming `stageAttemptToSpeculativeTaskIndices` to `stageAttemptToRunningSpeculativeTaskIndices`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org