Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r895722949


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1218,6 +1249,71 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[TaskSetManager] class TaskProcessRateCalculator {
+    private var totalRecordsRead = 0L
+    private var totalExecutorRunTime = 0L
+    private var avgTaskProcessRate = 0.0D
+    private val runingTasksProcessRate = new ConcurrentHashMap[Long, Double]()
+
+    private[TaskSetManager] def updateAvgTaskProcessRate(
+        taskId: Long,
+        result: DirectTaskResult[_]): Unit = {
+      var recordsRead = 0L
+      var executorRunTime = 0L
+      result.accumUpdates.foreach { a =>
+        if (a.name == Some(shuffleRead.RECORDS_READ) ||
+          a.name == Some(input.RECORDS_READ)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          recordsRead += acc.value
+        } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          executorRunTime = acc.value
+        }
+      }
+      totalRecordsRead += recordsRead
+      totalExecutorRunTime += executorRunTime
+      if (totalRecordsRead > 0 && totalExecutorRunTime > 0) {
+        avgTaskProcessRate = totalRecordsRead / (totalExecutorRunTime / 1000.0)
+      }
+      runingTasksProcessRate.remove(taskId)
+    }
+
+    private[scheduler] def updateRuningTaskProcessRate(
+        taskId: Long,
+        taskProcessRate: Double): Unit = {
+      runingTasksProcessRate.put(taskId, taskProcessRate)
+    }
+
+    private[TaskSetManager] def isInefficient(
+        tid: Long,
+        runtimeMs: Long,
+        taskInfo: TaskInfo): Boolean = {
+      // Only check inefficient tasks when avgTaskProcessRate > 0, because 
some stage
+      // tasks may have neither input records nor shuffleRead records, so the
+      // avgTaskProcessRate may be zero all the time, this case we should make 
sure
+      // it can be speculated. eg: some spark-sql like that 'msck repair 
table' or 'drop table'
+      // and so on.
+      if (avgTaskProcessRate <= 0.0) return true
+      val currentTaskProcessRate = runingTasksProcessRate.getOrDefault(tid, 
0.0)
+      if (currentTaskProcessRate <= 0.0) {
+        true
+      } else {
+        val taskProcessThreshold = avgTaskProcessRate * 
efficientTaskProcessMultiplier
+        val isInefficientTask = currentTaskProcessRate < taskProcessThreshold
+        if (isInefficientTask) {
+          logInfo(s"Marking task ${taskInfo.index} in stage ${taskSet.id} " +
+            s"(on ${taskInfo.host}) as speculatable because it ran 
${runtimeMs}ms and " +

Review Comment:
   "as speculatable" -> "as inenfficient"?
   
    I think only the caller should decide whether this task should be 
specaluted or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to