[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17942


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r116148995
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
   case t: Throwable =>
 e.addSuppressed(t)
 }
+context.markTaskCompleted(Some(e))
 throw e
 } finally {
-  // Call the task completion callbacks.
-  context.markTaskCompleted()
   try {
-Utils.tryLogNonFatalError {
-  // Release memory used by this thread for unrolling blocks
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
-  // Notify any tasks waiting for execution memory to be freed to 
wake up and try to
-  // acquire memory again. This makes impossible the scenario 
where a task sleeps forever
-  // because there are no other tasks left to notify it. Since 
this is safe to do but may
-  // not be strictly necessary, we should revisit whether we can 
remove this in the future.
-  val memoryManager = SparkEnv.get.memoryManager
-  memoryManager.synchronized { memoryManager.notifyAll() }
-}
+// Call the task completion callbacks. If "markTaskCompleted" is 
called twice, the second
+// one is no-op.
--- End diff --

Missed this comment.
LGTM. Thanks for clarifying @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r116148841
  
--- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala ---
@@ -55,14 +55,16 @@ class TaskCompletionListenerException(
   extends RuntimeException {
 
   override def getMessage: String = {
-if (errorMessages.size == 1) {
--- End diff --

Thx for clarifying !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r116143769
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
   case t: Throwable =>
 e.addSuppressed(t)
 }
+context.markTaskCompleted(Some(e))
 throw e
 } finally {
-  // Call the task completion callbacks.
-  context.markTaskCompleted()
   try {
-Utils.tryLogNonFatalError {
-  // Release memory used by this thread for unrolling blocks
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
-  // Notify any tasks waiting for execution memory to be freed to 
wake up and try to
-  // acquire memory again. This makes impossible the scenario 
where a task sleeps forever
-  // because there are no other tasks left to notify it. Since 
this is safe to do but may
-  // not be strictly necessary, we should revisit whether we can 
remove this in the future.
-  val memoryManager = SparkEnv.get.memoryManager
-  memoryManager.synchronized { memoryManager.notifyAll() }
-}
+// Call the task completion callbacks. If "markTaskCompleted" is 
called twice, the second
+// one is no-op.
+context.markTaskCompleted(None)
--- End diff --

@mridulm there is a `completed` flag in `markTaskCompleted`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r116143097
  
--- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala ---
@@ -55,14 +55,16 @@ class TaskCompletionListenerException(
   extends RuntimeException {
 
   override def getMessage: String = {
-if (errorMessages.size == 1) {
--- End diff --

It's a common pattern in scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r116139359
  
--- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala ---
@@ -55,14 +55,16 @@ class TaskCompletionListenerException(
   extends RuntimeException {
 
   override def getMessage: String = {
-if (errorMessages.size == 1) {
--- End diff --

Aside: Was just curious about the naming - interesting. Is this common 
pattern in spark code ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r116139282
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
   case t: Throwable =>
 e.addSuppressed(t)
 }
+context.markTaskCompleted(Some(e))
 throw e
 } finally {
-  // Call the task completion callbacks.
-  context.markTaskCompleted()
   try {
-Utils.tryLogNonFatalError {
-  // Release memory used by this thread for unrolling blocks
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
-  // Notify any tasks waiting for execution memory to be freed to 
wake up and try to
-  // acquire memory again. This makes impossible the scenario 
where a task sleeps forever
-  // because there are no other tasks left to notify it. Since 
this is safe to do but may
-  // not be strictly necessary, we should revisit whether we can 
remove this in the future.
-  val memoryManager = SparkEnv.get.memoryManager
-  memoryManager.synchronized { memoryManager.notifyAll() }
-}
+// Call the task completion callbacks. If "markTaskCompleted" is 
called twice, the second
+// one is no-op.
+context.markTaskCompleted(None)
--- End diff --

What I meant was, when there is an exception is throw, there will be two 
invocations of `context.markTaskCompleted`.
One with Throwable passed in, and another with None.

This would be confusing to the listeners - no ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r116063045
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
   case t: Throwable =>
 e.addSuppressed(t)
 }
+context.markTaskCompleted(Some(e))
 throw e
 } finally {
-  // Call the task completion callbacks.
-  context.markTaskCompleted()
   try {
-Utils.tryLogNonFatalError {
-  // Release memory used by this thread for unrolling blocks
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
-  // Notify any tasks waiting for execution memory to be freed to 
wake up and try to
-  // acquire memory again. This makes impossible the scenario 
where a task sleeps forever
-  // because there are no other tasks left to notify it. Since 
this is safe to do but may
-  // not be strictly necessary, we should revisit whether we can 
remove this in the future.
-  val memoryManager = SparkEnv.get.memoryManager
-  memoryManager.synchronized { memoryManager.notifyAll() }
-}
+// Call the task completion callbacks. If "markTaskCompleted" is 
called twice, the second
+// one is no-op.
+context.markTaskCompleted(None)
--- End diff --

> We end up calling markTaskCompleted twice when there is an exception 
thrown, right ?

Yes.

> Perhaps do this one when no Throwable is thrown.

Then if `context.markTaskCompleted(None)` throws an exception, 
`context.markTaskFailed(e)` will be called, so TaskFailureListener may be 
called after TaskCompletionListener. This is a slight behavior change. Not sure 
if it's safe. Someone may depend on the order of calling listeners?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r116062195
  
--- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala ---
@@ -55,14 +55,16 @@ class TaskCompletionListenerException(
   extends RuntimeException {
 
   override def getMessage: String = {
-if (errorMessages.size == 1) {
--- End diff --

I guess it's because it's not a class name. cc @rxin since you added this 
file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r115930587
  
--- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala ---
@@ -55,14 +55,16 @@ class TaskCompletionListenerException(
   extends RuntimeException {
 
   override def getMessage: String = {
-if (errorMessages.size == 1) {
--- End diff --

Good catch !
Btw, file name should probably have been `taskListeners.scala` (unrelated 
to this PR) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-11 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r115930093
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
   case t: Throwable =>
 e.addSuppressed(t)
 }
+context.markTaskCompleted(Some(e))
 throw e
 } finally {
-  // Call the task completion callbacks.
-  context.markTaskCompleted()
   try {
-Utils.tryLogNonFatalError {
-  // Release memory used by this thread for unrolling blocks
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
-  // Notify any tasks waiting for execution memory to be freed to 
wake up and try to
-  // acquire memory again. This makes impossible the scenario 
where a task sleeps forever
-  // because there are no other tasks left to notify it. Since 
this is safe to do but may
-  // not be strictly necessary, we should revisit whether we can 
remove this in the future.
-  val memoryManager = SparkEnv.get.memoryManager
-  memoryManager.synchronized { memoryManager.notifyAll() }
-}
+// Call the task completion callbacks. If "markTaskCompleted" is 
called twice, the second
+// one is no-op.
+context.markTaskCompleted(None)
--- End diff --

We end up calling `markTaskCompleted` twice when there is an exception 
thrown, right ?
Perhaps do this one when no `Throwable` is thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r115884649
  
--- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala ---
@@ -55,14 +55,16 @@ class TaskCompletionListenerException(
   extends RuntimeException {
 
   override def getMessage: String = {
-if (errorMessages.size == 1) {
--- End diff --

The error here is because
```
if (...) { ... } else {} + {}
```
is equivalent to
```
if (...) { ... } else {   {} + {}  }
```
which is wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r115884037
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
   case t: Throwable =>
 e.addSuppressed(t)
 }
+context.markTaskCompleted(Some(e))
 throw e
 } finally {
-  // Call the task completion callbacks.
-  context.markTaskCompleted()
   try {
-Utils.tryLogNonFatalError {
-  // Release memory used by this thread for unrolling blocks
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
-  // Notify any tasks waiting for execution memory to be freed to 
wake up and try to
-  // acquire memory again. This makes impossible the scenario 
where a task sleeps forever
-  // because there are no other tasks left to notify it. Since 
this is safe to do but may
-  // not be strictly necessary, we should revisit whether we can 
remove this in the future.
-  val memoryManager = SparkEnv.get.memoryManager
-  memoryManager.synchronized { memoryManager.notifyAll() }
-}
+// Call the task completion callbacks. If "markTaskCompleted" is 
called twice, the second
+// one is no-op.
+context.markTaskCompleted(None)
--- End diff --

Just add `try...finally` to wrap this line and fix the style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17942#discussion_r115883997
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
   case t: Throwable =>
 e.addSuppressed(t)
 }
+context.markTaskCompleted(Some(e))
 throw e
 } finally {
-  // Call the task completion callbacks.
-  context.markTaskCompleted()
   try {
-Utils.tryLogNonFatalError {
-  // Release memory used by this thread for unrolling blocks
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-  
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
-  // Notify any tasks waiting for execution memory to be freed to 
wake up and try to
-  // acquire memory again. This makes impossible the scenario 
where a task sleeps forever
-  // because there are no other tasks left to notify it. Since 
this is safe to do but may
-  // not be strictly necessary, we should revisit whether we can 
remove this in the future.
-  val memoryManager = SparkEnv.get.memoryManager
-  memoryManager.synchronized { memoryManager.notifyAll() }
-}
+// Call the task completion callbacks. If "markTaskCompleted" is 
called twice, the second
+// one is no-op.
+context.markTaskCompleted(None)
   } finally {
-// Though we unset the ThreadLocal here, the context member 
variable itself is still queried
-// directly in the TaskRunner to check for FetchFailedExceptions.
-TaskContext.unset()
+try {
--- End diff --

Just add `try...finally` and fix the style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...

2017-05-10 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/17942

[SPARK-20702][Core]TaskContextImpl.markTaskCompleted should not hide the 
original error

## What changes were proposed in this pull request?

This PR adds an `error` parameter to `TaskContextImpl.markTaskCompleted` to 
propagate the original error.

It also fixes an issue that `TaskCompletionListenerException.getMessage` 
doesn't include `previousError`.

## How was this patch tested?

New unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-20702

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17942.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17942


commit 5c500686ad91e1c95e1b2fe9985686a813ff1d12
Author: Shixiong Zhu 
Date:   2017-05-10T20:55:22Z

Rethrow the original error




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org