[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184441076 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1417,10 +1417,13 @@ class DAGScheduler( case exceptionFailure: ExceptionFailure => // Nothing left to do, already handled above for accumulator updates. + case _: TaskKilled => --- End diff -- will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184440128 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { - override def toErrorString: String = s"TaskKilled ($reason)" +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) + extends TaskFailedReason { + + override def toErrorString: String = "TaskKilled ($reason)" --- End diff -- Will do. Didn't notice `s` is missing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184438550 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1417,10 +1417,13 @@ class DAGScheduler( case exceptionFailure: ExceptionFailure => // Nothing left to do, already handled above for accumulator updates. + case _: TaskKilled => --- End diff -- nit: combine this with the `ExceptionFailure` case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184436634 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { - override def toErrorString: String = s"TaskKilled ($reason)" +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) + extends TaskFailedReason { + + override def toErrorString: String = "TaskKilled ($reason)" --- End diff -- `s"TaskKilled ($reason)"` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184429082 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -287,6 +287,28 @@ private[spark] class Executor( notifyAll() } +/** + * Utility function to: + *1. Report executor runtime and JVM gc time if possible + *2. Collect accumulator updates + *3. Set the finished flag to true and clear current thread's interrupt status + */ +private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { --- End diff -- @squito after address your comment, do you think we should come up with a more specific method name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184404291 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -287,6 +287,33 @@ private[spark] class Executor( notifyAll() } +/** + * Set executor runtime and JVM gc time if task instance is still valid + */ +private def reportGCAndExecutorTimeIfPossible(taskStart: Long): Unit = { + if (task != null) { +task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) +task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + } +} + +/** + * Utility function to: + *1. Report executor runtime and JVM gc time if possible + *2. Collect accumulator updates + *3. Set the finished flag to true and clear current thread's interrupt status + */ +private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { + reportGCAndExecutorTimeIfPossible(taskStart) --- End diff -- I don't think the extra `reportGCAndExecutorTimeIfPossible` is necessary, you can just inline it. and also the original `if (task != null)` is probably easier to follow than `Option(task).map` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...
GitHub user advancedxy opened a pull request: https://github.com/apache/spark/pull/21165 Spark 20087: Attach accumulators / metrics to 'TaskKilled' end reason ## What changes were proposed in this pull request? The ultimate goal is for listeners to onTaskEnd to receive metrics when a task is killed intentionally, since the data is currently just thrown away. This is already done for ExceptionFailure, so this just copies the same approach. ## How was this patch tested? Updated existing tests. This is a rework of https://github.com/apache/spark/pull/17422, all credits should go to @noodle-fb You can merge this pull request into a Git repository by running: $ git pull https://github.com/advancedxy/spark SPARK-20087 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21165.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21165 commit 6262c52f7eb4abcf06742e9afd9d1454f06cdf1f Author: Charles LewisDate: 2017-03-22T20:33:55Z report metrics for killed tasks commit 18308895ad07c9c757e96e08f5c40b5dcaaf3455 Author: Charles Lewis Date: 2017-03-24T19:06:46Z add task killed to exception accum test commit cb276bc1f1583ca7dcf44b8e80a9fc2cd09953cf Author: Charles Lewis Date: 2017-03-24T23:20:59Z extra fixes for task killed reason merge commit 30ae1457afd9899a5fa937c2fefac31f6a0752ed Author: Xianjin YE Date: 2018-04-26T07:34:54Z Fix merge conflict and semantic difference commit 88b1cebd76d7414d4bbdb99e03fe10f74e25029a Author: Xianjin YE Date: 2018-04-26T09:22:20Z Make accums in TaskKilled immutable and extract common logic in TaskRunner to reduce duplicate code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org