[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17942 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r116148995 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -115,26 +115,33 @@ private[spark] abstract class Task[T]( case t: Throwable => e.addSuppressed(t) } +context.markTaskCompleted(Some(e)) throw e } finally { - // Call the task completion callbacks. - context.markTaskCompleted() try { -Utils.tryLogNonFatalError { - // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) - // Notify any tasks waiting for execution memory to be freed to wake up and try to - // acquire memory again. This makes impossible the scenario where a task sleeps forever - // because there are no other tasks left to notify it. Since this is safe to do but may - // not be strictly necessary, we should revisit whether we can remove this in the future. - val memoryManager = SparkEnv.get.memoryManager - memoryManager.synchronized { memoryManager.notifyAll() } -} +// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second +// one is no-op. --- End diff -- Missed this comment. LGTM. Thanks for clarifying @zsxwing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r116148841 --- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala --- @@ -55,14 +55,16 @@ class TaskCompletionListenerException( extends RuntimeException { override def getMessage: String = { -if (errorMessages.size == 1) { --- End diff -- Thx for clarifying ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r116143769 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -115,26 +115,33 @@ private[spark] abstract class Task[T]( case t: Throwable => e.addSuppressed(t) } +context.markTaskCompleted(Some(e)) throw e } finally { - // Call the task completion callbacks. - context.markTaskCompleted() try { -Utils.tryLogNonFatalError { - // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) - // Notify any tasks waiting for execution memory to be freed to wake up and try to - // acquire memory again. This makes impossible the scenario where a task sleeps forever - // because there are no other tasks left to notify it. Since this is safe to do but may - // not be strictly necessary, we should revisit whether we can remove this in the future. - val memoryManager = SparkEnv.get.memoryManager - memoryManager.synchronized { memoryManager.notifyAll() } -} +// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second +// one is no-op. +context.markTaskCompleted(None) --- End diff -- @mridulm there is a `completed` flag in `markTaskCompleted`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r116143097 --- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala --- @@ -55,14 +55,16 @@ class TaskCompletionListenerException( extends RuntimeException { override def getMessage: String = { -if (errorMessages.size == 1) { --- End diff -- It's a common pattern in scala. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r116139359 --- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala --- @@ -55,14 +55,16 @@ class TaskCompletionListenerException( extends RuntimeException { override def getMessage: String = { -if (errorMessages.size == 1) { --- End diff -- Aside: Was just curious about the naming - interesting. Is this common pattern in spark code ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r116139282 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -115,26 +115,33 @@ private[spark] abstract class Task[T]( case t: Throwable => e.addSuppressed(t) } +context.markTaskCompleted(Some(e)) throw e } finally { - // Call the task completion callbacks. - context.markTaskCompleted() try { -Utils.tryLogNonFatalError { - // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) - // Notify any tasks waiting for execution memory to be freed to wake up and try to - // acquire memory again. This makes impossible the scenario where a task sleeps forever - // because there are no other tasks left to notify it. Since this is safe to do but may - // not be strictly necessary, we should revisit whether we can remove this in the future. - val memoryManager = SparkEnv.get.memoryManager - memoryManager.synchronized { memoryManager.notifyAll() } -} +// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second +// one is no-op. +context.markTaskCompleted(None) --- End diff -- What I meant was, when there is an exception is throw, there will be two invocations of `context.markTaskCompleted`. One with Throwable passed in, and another with None. This would be confusing to the listeners - no ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r116063045 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -115,26 +115,33 @@ private[spark] abstract class Task[T]( case t: Throwable => e.addSuppressed(t) } +context.markTaskCompleted(Some(e)) throw e } finally { - // Call the task completion callbacks. - context.markTaskCompleted() try { -Utils.tryLogNonFatalError { - // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) - // Notify any tasks waiting for execution memory to be freed to wake up and try to - // acquire memory again. This makes impossible the scenario where a task sleeps forever - // because there are no other tasks left to notify it. Since this is safe to do but may - // not be strictly necessary, we should revisit whether we can remove this in the future. - val memoryManager = SparkEnv.get.memoryManager - memoryManager.synchronized { memoryManager.notifyAll() } -} +// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second +// one is no-op. +context.markTaskCompleted(None) --- End diff -- > We end up calling markTaskCompleted twice when there is an exception thrown, right ? Yes. > Perhaps do this one when no Throwable is thrown. Then if `context.markTaskCompleted(None)` throws an exception, `context.markTaskFailed(e)` will be called, so TaskFailureListener may be called after TaskCompletionListener. This is a slight behavior change. Not sure if it's safe. Someone may depend on the order of calling listeners? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r116062195 --- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala --- @@ -55,14 +55,16 @@ class TaskCompletionListenerException( extends RuntimeException { override def getMessage: String = { -if (errorMessages.size == 1) { --- End diff -- I guess it's because it's not a class name. cc @rxin since you added this file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r115930587 --- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala --- @@ -55,14 +55,16 @@ class TaskCompletionListenerException( extends RuntimeException { override def getMessage: String = { -if (errorMessages.size == 1) { --- End diff -- Good catch ! Btw, file name should probably have been `taskListeners.scala` (unrelated to this PR) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r115930093 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -115,26 +115,33 @@ private[spark] abstract class Task[T]( case t: Throwable => e.addSuppressed(t) } +context.markTaskCompleted(Some(e)) throw e } finally { - // Call the task completion callbacks. - context.markTaskCompleted() try { -Utils.tryLogNonFatalError { - // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) - // Notify any tasks waiting for execution memory to be freed to wake up and try to - // acquire memory again. This makes impossible the scenario where a task sleeps forever - // because there are no other tasks left to notify it. Since this is safe to do but may - // not be strictly necessary, we should revisit whether we can remove this in the future. - val memoryManager = SparkEnv.get.memoryManager - memoryManager.synchronized { memoryManager.notifyAll() } -} +// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second +// one is no-op. +context.markTaskCompleted(None) --- End diff -- We end up calling `markTaskCompleted` twice when there is an exception thrown, right ? Perhaps do this one when no `Throwable` is thrown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r115884649 --- Diff: core/src/main/scala/org/apache/spark/util/taskListeners.scala --- @@ -55,14 +55,16 @@ class TaskCompletionListenerException( extends RuntimeException { override def getMessage: String = { -if (errorMessages.size == 1) { --- End diff -- The error here is because ``` if (...) { ... } else {} + {} ``` is equivalent to ``` if (...) { ... } else { {} + {} } ``` which is wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r115884037 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -115,26 +115,33 @@ private[spark] abstract class Task[T]( case t: Throwable => e.addSuppressed(t) } +context.markTaskCompleted(Some(e)) throw e } finally { - // Call the task completion callbacks. - context.markTaskCompleted() try { -Utils.tryLogNonFatalError { - // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) - // Notify any tasks waiting for execution memory to be freed to wake up and try to - // acquire memory again. This makes impossible the scenario where a task sleeps forever - // because there are no other tasks left to notify it. Since this is safe to do but may - // not be strictly necessary, we should revisit whether we can remove this in the future. - val memoryManager = SparkEnv.get.memoryManager - memoryManager.synchronized { memoryManager.notifyAll() } -} +// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second +// one is no-op. +context.markTaskCompleted(None) --- End diff -- Just add `try...finally` to wrap this line and fix the style --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17942#discussion_r115883997 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -115,26 +115,33 @@ private[spark] abstract class Task[T]( case t: Throwable => e.addSuppressed(t) } +context.markTaskCompleted(Some(e)) throw e } finally { - // Call the task completion callbacks. - context.markTaskCompleted() try { -Utils.tryLogNonFatalError { - // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) - // Notify any tasks waiting for execution memory to be freed to wake up and try to - // acquire memory again. This makes impossible the scenario where a task sleeps forever - // because there are no other tasks left to notify it. Since this is safe to do but may - // not be strictly necessary, we should revisit whether we can remove this in the future. - val memoryManager = SparkEnv.get.memoryManager - memoryManager.synchronized { memoryManager.notifyAll() } -} +// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second +// one is no-op. +context.markTaskCompleted(None) } finally { -// Though we unset the ThreadLocal here, the context member variable itself is still queried -// directly in the TaskRunner to check for FetchFailedExceptions. -TaskContext.unset() +try { --- End diff -- Just add `try...finally` and fix the style --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17942: [SPARK-20702][Core]TaskContextImpl.markTaskComple...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/17942 [SPARK-20702][Core]TaskContextImpl.markTaskCompleted should not hide the original error ## What changes were proposed in this pull request? This PR adds an `error` parameter to `TaskContextImpl.markTaskCompleted` to propagate the original error. It also fixes an issue that `TaskCompletionListenerException.getMessage` doesn't include `previousError`. ## How was this patch tested? New unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-20702 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17942.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17942 commit 5c500686ad91e1c95e1b2fe9985686a813ff1d12 Author: Shixiong Zhu Date: 2017-05-10T20:55:22Z Rethrow the original error --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org