Ngone51 commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r877700761
########## core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala: ########## @@ -769,6 +785,25 @@ private[spark] class TaskSetManager( } } + def setTaskRecordsAndRunTime( + info: TaskInfo, + result: DirectTaskResult[_]): Unit = { + var records = 0L + var runTime = 0L + result.accumUpdates.foreach { a => + if (a.name == Some(shuffleRead.RECORDS_READ) || + a.name == Some(input.RECORDS_READ)) { + val acc = a.asInstanceOf[LongAccumulator] + records += acc.value + } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) { + val acc = a.asInstanceOf[LongAccumulator] + runTime = acc.value + } + } + info.setRecords(records) + info.setRunTime(runTime) Review Comment: Same here.. I think we centralize the calculation of this stuff into `InefficientTaskCalculator`. ########## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ########## @@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl( executorUpdates) } + private def getTaskAccumulableInfosAndProgressRate( + updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = { + var records = 0L + var runTime = 0L + val accInfos = updates.map { acc => + if (calculateTaskProgressRate && acc.name.isDefined) { + val name = acc.name.get + if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) { + records += acc.value.asInstanceOf[Long] + } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) { + runTime = acc.value.asInstanceOf[Long] + } + } + acc.toInfo(Some(acc.value), None) + } + val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) { + records / (runTime / 1000.0) + } else { + 0.0D + } Review Comment: Can we centralize the calculation of task progress rate to the `InefficientTaskCalculator` only? It seems not each calculation is necessary here since the speculation check only happens under certain conditions, e.g., `numSuccessfulTasks >= minFinishedForSpeculation`. And I think we can reuse the existing`TaskInfo._accumulables` directly, which could make cold cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org