Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034375202


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -810,9 +812,10 @@ private[spark] class ExecutorAllocationManager(
       val stageId = speculativeTask.stageId
       val stageAttemptId = speculativeTask.stageAttemptId
       val stageAttempt = StageAttempt(stageId, stageAttemptId)
+      val taskIndex = speculativeTask.taskIndex
       allocationManager.synchronized {
-        stageAttemptToNumSpeculativeTasks(stageAttempt) =
-          stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
+        stageAttemptToUnsubmittedSpeculativeTasks.getOrElseUpdate(stageAttempt,

Review Comment:
   "Unsubmitted" here is confusing since the method name here is 
`onSpeculativeTaskSubmitted`.. how about 
`stageAttemptToPendingSpeculativeTasks`?



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -775,16 +779,14 @@ private[spark] class ExecutorAllocationManager(
           }
         }
         if (taskEnd.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove{taskIndex}}
-          // If the previous task attempt succeeded first and it was the last 
task in a stage,
-          // the stage may have been removed before handing this speculative 
TaskEnd event.
-          if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
-            stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
-          }
+          
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach(_.remove(taskIndex))
         }
 
         taskEnd.reason match {
-          case Success | _: TaskKilled =>
+          case Success =>
+            // remove speculative task for task finished.

Review Comment:
   ```suggestion
               // Remove pending speculative task in case the normal task is 
finished before starting the speculative task
   ```



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.

Review Comment:
   SPARK-41192?



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+    // Number of speculative tasks pending in each stageAttempt
+    private val stageAttemptToUnsubmittedSpeculativeTasks =

Review Comment:
   How about renaming to `stageAttemptToPendingSpeculativeTasks` and renaming 
`stageAttemptToSpeculativeTaskIndices` to 
`stageAttemptToRunningSpeculativeTaskIndices`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to