mridulm commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r892802648


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1068,25 +1086,61 @@ private[spark] class TaskSetManager(
    * Check if the task associated with the given tid has past the time 
threshold and should be
    * speculative run.
    */
-  private def checkAndSubmitSpeculatableTask(
-      tid: Long,
+  private def checkAndSubmitSpeculatableTasks(
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      if (!successful(index) && copiesRunning(index) == 1 && 
!speculatableTasks.contains(index)) {
+        val runtimeMs = info.timeRunning(currentTimeMillis)
+
+        def checkMaySpeculate(): Boolean = {
+          if (customizedThreshold || !inefficientTaskCalculator.isDefined) {
+            true
+          } else {
+            val longTimeTask = (numTasks <= 1) ||
+              runtimeMs > efficientTaskDurationFactor * threshold
+            longTimeTask || 
inefficientTaskCalculator.get.maySpeculateTask(tid, runtimeMs, info)
+          }
+        }
+
+        val maySpeculate = (runtimeMs > threshold) && checkMaySpeculate()
+        val executorDecommissionSpeculate =
+          (!maySpeculate &&
+            executorDecommissionKillInterval.isDefined && 
!successfulTaskDurations.isEmpty()) && {
+            val taskInfo = taskInfos(tid)

Review Comment:
   nit: Use `info` here. (see below btw)



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1068,25 +1086,61 @@ private[spark] class TaskSetManager(
    * Check if the task associated with the given tid has past the time 
threshold and should be
    * speculative run.
    */
-  private def checkAndSubmitSpeculatableTask(
-      tid: Long,
+  private def checkAndSubmitSpeculatableTasks(
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      if (!successful(index) && copiesRunning(index) == 1 && 
!speculatableTasks.contains(index)) {
+        val runtimeMs = info.timeRunning(currentTimeMillis)
+
+        def checkMaySpeculate(): Boolean = {
+          if (customizedThreshold || !inefficientTaskCalculator.isDefined) {
+            true
+          } else {
+            val longTimeTask = (numTasks <= 1) ||
+              runtimeMs > efficientTaskDurationFactor * threshold
+            longTimeTask || 
inefficientTaskCalculator.get.maySpeculateTask(tid, runtimeMs, info)
+          }
+        }
+
+        val maySpeculate = (runtimeMs > threshold) && checkMaySpeculate()
+        val executorDecommissionSpeculate =
+          (!maySpeculate &&
+            executorDecommissionKillInterval.isDefined && 
!successfulTaskDurations.isEmpty()) && {
+            val taskInfo = taskInfos(tid)
+            val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
+            decomState.isDefined && {
+              // Check if this task might finish after this executor is 
decommissioned.
+              // We estimate the task's finish time by using the median task 
duration.
+              // Whereas the time when the executor might be decommissioned is 
estimated using the
+              // config executorDecommissionKillInterval. If the task is going 
to finish after
+              // decommissioning, then we will eagerly speculate the task.
+              val taskEndTimeBasedOnMedianDuration =
+              taskInfos(tid).launchTime + successfulTaskDurations.median
+              val executorDecomTime =
+                decomState.get.startTime + executorDecommissionKillInterval.get
+              executorDecomTime < taskEndTimeBasedOnMedianDuration
+            }

Review Comment:
   nit: A simplification here would be:
   
   ```suggestion
               sched.getExecutorDecommissionState(taskInfo.executorId).exists { 
decomState =>
                 // Check if this task might finish after this executor is 
decommissioned.
                 // We estimate the task's finish time by using the median task 
duration.
                 // Whereas the time when the executor might be decommissioned 
is estimated using the
                 // config executorDecommissionKillInterval. If the task is 
going to finish after
                 // decommissioning, then we will eagerly speculate the task.
                 val taskEndTimeBasedOnMedianDuration =
                 info.launchTime + successfulTaskDurations.median
                 val executorDecomTime =
                   decomState.startTime + executorDecommissionKillInterval.get
                 executorDecomTime < taskEndTimeBasedOnMedianDuration
               }
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -800,6 +814,10 @@ private[spark] class TaskSetManager(
     info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
     if (speculationEnabled) {
       successfulTaskDurations.insert(info.duration)
+      inefficientTaskCalculator.foreach { inefficientTask =>
+        inefficientTask.updateTaskProgressThreshold(result)
+        inefficientTask.removeRuningTasksProgressRate(tid)

Review Comment:
   nit: Rename `inefficientTask` -> `calculator` here ?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1260,71 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTaskCalculator {
+    private var allTotalRecordsRead = 0L
+    private var allTotalExecutorRunTime = 0L
+    @volatile private var successTaskProgressThreshold = 0.0D
+    private val runingTasksProgressRate = new ConcurrentHashMap[Long, Double]()
+
+    private[scheduler] def updateTaskProgressThreshold(result: 
DirectTaskResult[_]): Unit = {
+      var totalRecordsRead = 0L
+      var totalExecutorRunTime = 0L
+      result.accumUpdates.foreach { a =>
+        if (a.name == Some(shuffleRead.RECORDS_READ) ||
+          a.name == Some(input.RECORDS_READ)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          totalRecordsRead += acc.value
+        } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          totalExecutorRunTime = acc.value
+        }
+      }
+      allTotalRecordsRead += totalRecordsRead
+      allTotalExecutorRunTime += totalExecutorRunTime
+      if (allTotalRecordsRead > 0 && allTotalExecutorRunTime > 0) {
+        successTaskProgressThreshold = allTotalRecordsRead / 
(allTotalExecutorRunTime / 1000.0)
+      }
+    }
+
+    private[scheduler] def updateRuningTasksProgressRate(

Review Comment:
   Other than this method, all other methods in this class can be 
`private[TaskSetManager]` to limit visibility.
   @Ngone51's comment about `synchronized` access to the state will become 
clear in that case (since all use from `TaskSetManager` would be within 
scheduler protected lock)



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1068,25 +1086,61 @@ private[spark] class TaskSetManager(
    * Check if the task associated with the given tid has past the time 
threshold and should be
    * speculative run.
    */
-  private def checkAndSubmitSpeculatableTask(
-      tid: Long,
+  private def checkAndSubmitSpeculatableTasks(
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      if (!successful(index) && copiesRunning(index) == 1 && 
!speculatableTasks.contains(index)) {
+        val runtimeMs = info.timeRunning(currentTimeMillis)
+
+        def checkMaySpeculate(): Boolean = {
+          if (customizedThreshold || !inefficientTaskCalculator.isDefined) {
+            true
+          } else {
+            val longTimeTask = (numTasks <= 1) ||
+              runtimeMs > efficientTaskDurationFactor * threshold
+            longTimeTask || 
inefficientTaskCalculator.get.maySpeculateTask(tid, runtimeMs, info)
+          }
+        }
+
+        val maySpeculate = (runtimeMs > threshold) && checkMaySpeculate()
+        val executorDecommissionSpeculate =
+          (!maySpeculate &&
+            executorDecommissionKillInterval.isDefined && 
!successfulTaskDurations.isEmpty()) && {
+            val taskInfo = taskInfos(tid)
+            val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
+            decomState.isDefined && {
+              // Check if this task might finish after this executor is 
decommissioned.
+              // We estimate the task's finish time by using the median task 
duration.
+              // Whereas the time when the executor might be decommissioned is 
estimated using the
+              // config executorDecommissionKillInterval. If the task is going 
to finish after
+              // decommissioning, then we will eagerly speculate the task.
+              val taskEndTimeBasedOnMedianDuration =
+              taskInfos(tid).launchTime + successfulTaskDurations.median
+              val executorDecomTime =
+                decomState.get.startTime + executorDecommissionKillInterval.get
+              executorDecomTime < taskEndTimeBasedOnMedianDuration
+            }
+          }
+        val speculated = maySpeculate || executorDecommissionSpeculate

Review Comment:
   Pull the executor decomissioning related logic into a private method ?
   So we have
   ```
   val speculated = (runtimeMs > threshold && checkMaySpeculate()) || 
       shouldSpeculateForExecutorDecomissioning(info)
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1108,40 +1162,29 @@ private[spark] class TaskSetManager(
     // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we 
should only count the
     // tasks that are submitted by this `TaskSetManager` and are completed 
successfully.
     val numSuccessfulTasks = successfulTaskDurations.size()
-    if (numSuccessfulTasks >= minFinishedForSpeculation) {
-      val time = clock.getTimeMillis()
-      val medianDuration = successfulTaskDurations.median
-      val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation)
-      // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
-      // bound based on that.
-      logDebug("Task length threshold for speculation: " + threshold)
-      for (tid <- runningTasksSet) {
-        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-        if (!speculated && executorDecommissionKillInterval.isDefined) {
-          val taskInfo = taskInfos(tid)
-          val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
-          if (decomState.isDefined) {
-            // Check if this task might finish after this executor is 
decommissioned.
-            // We estimate the task's finish time by using the median task 
duration.
-            // Whereas the time when the executor might be decommissioned is 
estimated using the
-            // config executorDecommissionKillInterval. If the task is going 
to finish after
-            // decommissioning, then we will eagerly speculate the task.
-            val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
-            val executorDecomTime = decomState.get.startTime + 
executorDecommissionKillInterval.get
-            val canExceedDeadline = executorDecomTime < 
taskEndTimeBasedOnMedianDuration
-            if (canExceedDeadline) {
-              speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
-            }
-          }
-        }
-        foundTasks |= speculated
+    val timeMs = clock.getTimeMillis()
+    if (numSuccessfulTasks >= minFinishedForSpeculation || numTasks == 1) {
+      val threshold = if (numSuccessfulTasks <= 0 && 
isSpeculationThresholdSpecified) {
+        speculationTaskDurationThresOpt.get
+      } else {
+        val medianDuration = successfulTaskDurations.median
+        speculationMultiplier * medianDuration
       }
-    } else if (speculationTaskDurationThresOpt.isDefined && 
speculationTasksLessEqToSlots) {
-      val time = clock.getTimeMillis()
-      val threshold = speculationTaskDurationThresOpt.get
+      val newThreshold = max(threshold, minTimeToSpeculation)
+        // bound based on that.
+      logDebug("Task length threshold for speculation: " + newThreshold)
+      foundTasks = checkAndSubmitSpeculatableTasks(timeMs, newThreshold, 
numSuccessfulTasks)
+    } else if (isSpeculationThresholdSpecified && 
speculationTasksLessEqToSlots) {
+      val threshold = max(speculationTaskDurationThresOpt.get, 
minTimeToSpeculation)
       logDebug(s"Tasks taking longer time than provided speculation threshold: 
$threshold")
-      for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+      foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold, 
numSuccessfulTasks,
+        customizedThreshold = true)
+    }
+    // avoid more warning logs.
+    if (foundTasks) {
+      val elapsedMs = clock.getTimeMillis() - timeMs
+      if (elapsedMs > minTimeToSpeculation) {
+        logWarning(s"Time to checkSpeculatableTasks ${elapsedMs}ms > 
${minTimeToSpeculation}ms")
       }

Review Comment:
   One of the things which is slightly difficult to reason about in the PR is 
the change in semantics around `numTasks == 1`. Can we preserve the existing 
behavior w.r.t `numTasks == 1` ? We can expand the functionality related to it 
in a follow up PR: where we focus only on that change in behavior.
   
   That is, we remove all the special casing for `numTasks == 1` introduced in 
this change.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1260,71 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTaskCalculator {
+    private var allTotalRecordsRead = 0L
+    private var allTotalExecutorRunTime = 0L
+    @volatile private var successTaskProgressThreshold = 0.0D
+    private val runingTasksProgressRate = new ConcurrentHashMap[Long, Double]()
+
+    private[scheduler] def updateTaskProgressThreshold(result: 
DirectTaskResult[_]): Unit = {
+      var totalRecordsRead = 0L
+      var totalExecutorRunTime = 0L
+      result.accumUpdates.foreach { a =>
+        if (a.name == Some(shuffleRead.RECORDS_READ) ||
+          a.name == Some(input.RECORDS_READ)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          totalRecordsRead += acc.value
+        } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          totalExecutorRunTime = acc.value
+        }
+      }
+      allTotalRecordsRead += totalRecordsRead
+      allTotalExecutorRunTime += totalExecutorRunTime
+      if (allTotalRecordsRead > 0 && allTotalExecutorRunTime > 0) {
+        successTaskProgressThreshold = allTotalRecordsRead / 
(allTotalExecutorRunTime / 1000.0)
+      }
+    }
+
+    private[scheduler] def updateRuningTasksProgressRate(
+        taskId: Long,
+        taskProgressRate: Double): Unit = {
+      runingTasksProgressRate.put(taskId, taskProgressRate)
+    }
+
+    private[scheduler] def removeRuningTasksProgressRate(taskId: Long): Unit = 
{
+      runingTasksProgressRate.remove(taskId)
+    }
+
+    private[scheduler] def maySpeculateTask(
+        tid: Long,
+        runtimeMs: Long,
+        taskInfo: TaskInfo): Boolean = {
+      // Only check inefficient tasks when successTaskProgressThreshold > 0, 
because some stage
+      // tasks may have neither input records nor shuffleRead records, so the
+      // successTaskProgressThreshold may be zero all the time, this case we 
should make sure
+      // it can be speculated. eg: some spark-sql like that 'msck repair 
table' or 'drop table'
+      // and so on.
+      lazy val currentTaskProgressRate = 
runingTasksProgressRate.getOrDefault(tid, 0.0)
+      if (successTaskProgressThreshold <= 0.0 || currentTaskProgressRate <= 
0.0) {

Review Comment:
   nit: Reorganize code so that we remove use of `lazy val` ?
   Except for the case of `successTaskProgressThreshold <= 0`, we always use 
this immediately.
   ```suggestion
         if (successTaskProgressThreshold <= 0.0) return true
         
         val currentTaskProgressRate = 
runingTasksProgressRate.getOrDefault(tid, 0.0)
         if (currentTaskProgressRate <= 0.0) {
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1068,25 +1086,61 @@ private[spark] class TaskSetManager(
    * Check if the task associated with the given tid has past the time 
threshold and should be
    * speculative run.
    */
-  private def checkAndSubmitSpeculatableTask(
-      tid: Long,
+  private def checkAndSubmitSpeculatableTasks(
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      if (!successful(index) && copiesRunning(index) == 1 && 
!speculatableTasks.contains(index)) {
+        val runtimeMs = info.timeRunning(currentTimeMillis)
+
+        def checkMaySpeculate(): Boolean = {
+          if (customizedThreshold || !inefficientTaskCalculator.isDefined) {

Review Comment:
   nit: `!inefficientTaskCalculator.isDefined` -> 
`inefficientTaskCalculator.isEmpty`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to