toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044148832


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{
-    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], 
taskIndex: Int): Unit = {
+    val speculativeTaskSubmittedEvent =
+      SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+    // add taskIndex field for Executor Dynamic Allocation
+    speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+    speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+    listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   How about this:
   ```
   case class SparkListenerSpeculativeTaskSubmitted(
       stageId: Int,
       stageAttemptId: Int = 0)
     extends SparkListenerEvent {
     // Note: this is here for backwards-compatibility with older versions of 
this event which
     // didn't stored taskIndex
     private var _taskIndex: Int = -1
     private var _partitionId: Int = -1
   
     def taskIndex: Int = _taskIndex
     def partitionId: Int = _partitionId
   
     def this(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: 
Int) = {
       this(stageId, stageAttemptId)
       _partitionId = partitionId
       _taskIndex = taskIndex
     }
   }
   ```
   
   We can constrict with `taskIndex` and `partitionId`.
   ```
   val speculativeTaskSubmittedEvent = new 
SparkListenerSpeculativeTaskSubmitted(
         task.stageId, task.stageAttemptId, taskIndex, task.partitionId)
   ```
   
   default usage as
   ```
   val speculativeTaskSubmittedEvent  = speculativeTaskSubmittedEvent(stageId, 
stageAttempId)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to