[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...

2018-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17422


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17422#discussion_r165772194
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -429,15 +429,42 @@ private[spark] class Executor(
 
 case t: TaskKilledException =>
   logInfo(s"Executor killed $taskName (TID $taskId), reason: 
${t.reason}")
+
+  // Collect latest accumulator values to report back to the driver
+  val accums: Seq[AccumulatorV2[_, _]] =
+if (task != null) {
+  task.metrics.setExecutorRunTime(System.currentTimeMillis() - 
taskStart)
+  task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
+  task.collectAccumulatorUpdates(taskFailed = true)
+} else {
+  Seq.empty
+}
+  val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), 
None))
--- End diff --

this should be refactored, and not repeated 3 times.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17422#discussion_r165772055
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -212,9 +212,19 @@ case object TaskResultLost extends TaskFailedReason {
  * Task was killed intentionally and needs to be rescheduled.
  */
 @DeveloperApi
-case class TaskKilled(reason: String) extends TaskFailedReason {
-  override def toErrorString: String = s"TaskKilled ($reason)"
+case class TaskKilled(
+reason: String,
+accumUpdates: Seq[AccumulableInfo] = Seq.empty,
+private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil)
+  extends TaskFailedReason {
+
+  override def toErrorString: String = "TaskKilled ($reason)"
   override def countTowardsTaskFailures: Boolean = false
+
+  private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): 
TaskKilled = {
--- End diff --

I don't think this method is really necessary at all, you could just pass 
it in the constructor in the places its used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org