Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r899037493


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -863,6 +872,30 @@ private[spark] class TaskSchedulerImpl(
       executorUpdates)
   }
 
+ private def getTaskAccumulableInfosAndProcessRate(
+      updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = {

Review Comment:
   nit: 4 indents



##########
core/src/main/scala/org/apache/spark/executor/InputMetrics.scala:
##########
@@ -56,4 +56,5 @@ class InputMetrics private[spark] () extends Serializable {
   private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
   private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
   private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
+  private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)

Review Comment:
   test only?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -863,6 +872,30 @@ private[spark] class TaskSchedulerImpl(
       executorUpdates)
   }
 
+ private def getTaskAccumulableInfosAndProcessRate(
+      updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = {
+   var recordsRead = 0L
+   var executorRunTime = 0L
+   val accInfos = updates.map { acc =>
+     if (efficientTaskCalcualtionEnabled && acc.name.isDefined) {
+       val name = acc.name.get
+       if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) {
+         recordsRead += acc.value.asInstanceOf[Long]
+       } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) {
+         executorRunTime = acc.value.asInstanceOf[Long]
+       }
+     }
+     acc.toInfo(Some(acc.value), None)
+   }
+   val taskProcessRate = if (efficientTaskCalcualtionEnabled &&
+     executorRunTime > 0 && recordsRead > 0) {
+     recordsRead / (executorRunTime / 1000.0)
+   } else {
+     0.0D
+   }

Review Comment:
   Extrac this code block to a common method to be reused in 
`updateAvgTaskProcessRate` as well?



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2073,6 +2073,37 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc("When set to true, spark will evaluate the efficiency of task 
processing through the " +
+        "stage task metrics and only need to speculate the inefficient tasks. 
A task is " +
+        "inefficient when its data process rate is less than the average data 
process " +
+        "rate of all successful tasks in the stage multiplied by a 
multiplier.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER =
+    ConfigBuilder("spark.speculation.efficiency.processMultiplier")
+      .doc("A multiplier for evaluating the efficiency of task processing. A 
task is inefficient " +
+        "when its data process rate is less than the average data process rate 
of all " +
+        "successful tasks in the stage multiplied by the multiplier.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(v => v > 0.0 && v <= 1.0, "multiplier must be in (0.0, 1.0]")
+      .createWithDefault(0.75)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR =
+    ConfigBuilder("spark.speculation.efficiency.durationFactor")
+      .doc(s"When a task duration is large than the factor multiplied by the 
threshold which " +
+        s"may be ${SPECULATION_MULTIPLIER.key} * 
successfulTaskDurations.median or " +
+        s"${SPECULATION_TASK_DURATION_THRESHOLD.key}, and it should be 
considered for " +

Review Comment:
   Is that ture? I thought it should be `SPECULATION_MIN_THRESHOLD`? When 
`SPECULATION_TASK_DURATION_THRESHOLD` is defined, IIUC, customizedThreshold is 
true, so there's no chance to apply the factor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to