[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17422 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17422#discussion_r165772194 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -429,15 +429,42 @@ private[spark] class Executor( case t: TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") + + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = +if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.collectAccumulatorUpdates(taskFailed = true) +} else { + Seq.empty +} + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) --- End diff -- this should be refactored, and not repeated 3 times. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17422#discussion_r165772055 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,19 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { - override def toErrorString: String = s"TaskKilled ($reason)" +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) + extends TaskFailedReason { + + override def toErrorString: String = "TaskKilled ($reason)" override def countTowardsTaskFailures: Boolean = false + + private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): TaskKilled = { --- End diff -- I don't think this method is really necessary at all, you could just pass it in the constructor in the places its used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org