[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...

2018-04-26 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/21165#discussion_r184441076
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1417,10 +1417,13 @@ class DAGScheduler(
   case exceptionFailure: ExceptionFailure =>
 // Nothing left to do, already handled above for accumulator 
updates.
 
+  case _: TaskKilled =>
--- End diff --

will do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...

2018-04-26 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/21165#discussion_r184440128
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason {
  * Task was killed intentionally and needs to be rescheduled.
  */
 @DeveloperApi
-case class TaskKilled(reason: String) extends TaskFailedReason {
-  override def toErrorString: String = s"TaskKilled ($reason)"
+case class TaskKilled(
+reason: String,
+accumUpdates: Seq[AccumulableInfo] = Seq.empty,
+private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil)
+  extends TaskFailedReason {
+
+  override def toErrorString: String = "TaskKilled ($reason)"
--- End diff --

Will do.

Didn't notice `s` is missing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...

2018-04-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21165#discussion_r184438550
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1417,10 +1417,13 @@ class DAGScheduler(
   case exceptionFailure: ExceptionFailure =>
 // Nothing left to do, already handled above for accumulator 
updates.
 
+  case _: TaskKilled =>
--- End diff --

nit: combine this with the `ExceptionFailure` case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...

2018-04-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21165#discussion_r184436634
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason {
  * Task was killed intentionally and needs to be rescheduled.
  */
 @DeveloperApi
-case class TaskKilled(reason: String) extends TaskFailedReason {
-  override def toErrorString: String = s"TaskKilled ($reason)"
+case class TaskKilled(
+reason: String,
+accumUpdates: Seq[AccumulableInfo] = Seq.empty,
+private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil)
+  extends TaskFailedReason {
+
+  override def toErrorString: String = "TaskKilled ($reason)"
--- End diff --

`s"TaskKilled ($reason)"`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...

2018-04-26 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/21165#discussion_r184429082
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -287,6 +287,28 @@ private[spark] class Executor(
   notifyAll()
 }
 
+/**
+ *  Utility function to:
+ *1. Report executor runtime and JVM gc time if possible
+ *2. Collect accumulator updates
+ *3. Set the finished flag to true and clear current thread's 
interrupt status
+ */
+private def collectAccumulatorsAndResetStatusOnFailure(taskStart: 
Long) = {
--- End diff --

@squito after address your comment,  do you think we should come up with a 
more specific method name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...

2018-04-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21165#discussion_r184404291
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -287,6 +287,33 @@ private[spark] class Executor(
   notifyAll()
 }
 
+/**
+ * Set executor runtime and JVM gc time if task instance is still valid
+ */
+private def reportGCAndExecutorTimeIfPossible(taskStart: Long): Unit = 
{
+  if (task != null) {
+task.metrics.setExecutorRunTime(System.currentTimeMillis() - 
taskStart)
+task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
+  }
+}
+
+/**
+ *  Utility function to:
+ *1. Report executor runtime and JVM gc time if possible
+ *2. Collect accumulator updates
+ *3. Set the finished flag to true and clear current thread's 
interrupt status
+ */
+private def collectAccumulatorsAndResetStatusOnFailure(taskStart: 
Long) = {
+  reportGCAndExecutorTimeIfPossible(taskStart)
--- End diff --

I don't think the extra `reportGCAndExecutorTimeIfPossible` is necessary, 
you can just inline it.  and also the original `if (task != null)` is probably 
easier to follow than `Option(task).map`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21165: Spark 20087: Attach accumulators / metrics to 'Ta...

2018-04-26 Thread advancedxy
GitHub user advancedxy opened a pull request:

https://github.com/apache/spark/pull/21165

Spark 20087: Attach accumulators / metrics to 'TaskKilled' end reason

## What changes were proposed in this pull request?
The ultimate goal is for listeners to onTaskEnd to receive metrics when a 
task is killed intentionally, since the data is currently just thrown away. 
This is already done for ExceptionFailure, so this just copies the same 
approach.

## How was this patch tested?
Updated existing tests.

This is a rework of https://github.com/apache/spark/pull/17422, all credits 
should go to @noodle-fb

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/advancedxy/spark SPARK-20087

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21165.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21165


commit 6262c52f7eb4abcf06742e9afd9d1454f06cdf1f
Author: Charles Lewis 
Date:   2017-03-22T20:33:55Z

report metrics for killed tasks

commit 18308895ad07c9c757e96e08f5c40b5dcaaf3455
Author: Charles Lewis 
Date:   2017-03-24T19:06:46Z

add task killed to exception accum test

commit cb276bc1f1583ca7dcf44b8e80a9fc2cd09953cf
Author: Charles Lewis 
Date:   2017-03-24T23:20:59Z

extra fixes for task killed reason merge

commit 30ae1457afd9899a5fa937c2fefac31f6a0752ed
Author: Xianjin YE 
Date:   2018-04-26T07:34:54Z

Fix merge conflict and semantic difference

commit 88b1cebd76d7414d4bbdb99e03fe10f74e25029a
Author: Xianjin YE 
Date:   2018-04-26T09:22:20Z

Make accums in TaskKilled immutable and extract
common logic in TaskRunner to reduce duplicate code




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org