Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r877700761


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -769,6 +785,25 @@ private[spark] class TaskSetManager(
     }
   }
 
+  def setTaskRecordsAndRunTime(
+      info: TaskInfo,
+      result: DirectTaskResult[_]): Unit = {
+    var records = 0L
+    var runTime = 0L
+    result.accumUpdates.foreach { a =>
+      if (a.name == Some(shuffleRead.RECORDS_READ) ||
+        a.name == Some(input.RECORDS_READ)) {
+        val acc = a.asInstanceOf[LongAccumulator]
+        records += acc.value
+      } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+        val acc = a.asInstanceOf[LongAccumulator]
+        runTime = acc.value
+      }
+    }
+    info.setRecords(records)
+    info.setRunTime(runTime)

Review Comment:
   Same here.. I think we centralize the calculation of this stuff into 
`InefficientTaskCalculator`.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl(
       executorUpdates)
   }
 
+ private def getTaskAccumulableInfosAndProgressRate(
+      updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = {
+   var records = 0L
+   var runTime = 0L
+   val accInfos = updates.map { acc =>
+     if (calculateTaskProgressRate && acc.name.isDefined) {
+       val name = acc.name.get
+       if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) {
+         records += acc.value.asInstanceOf[Long]
+       } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) {
+         runTime = acc.value.asInstanceOf[Long]
+       }
+     }
+     acc.toInfo(Some(acc.value), None)
+   }
+   val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) {
+     records / (runTime / 1000.0)
+   } else {
+     0.0D
+   }

Review Comment:
   Can we centralize the calculation of task progress rate to the 
`InefficientTaskCalculator` only? It seems not each calculation is necessary 
here since the speculation check only happens under certain conditions, e.g., 
`numSuccessfulTasks >= minFinishedForSpeculation`.  And I think we can reuse 
the existing`TaskInfo._accumulables` directly, which could make cold cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to