[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19852 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r155701675 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -317,10 +317,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( logDebug("Exception thrown after task interruption", e) throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason")) - case e: Exception if env.isStopped => --- End diff -- Note to myself: this was added in https://github.com/apache/spark/pull/2838 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r155686222 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -319,7 +319,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) -null.asInstanceOf[OUT] // exit silently +throw new SparkException("Spark session has been stopped", e) --- End diff -- Make senses. I will remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r155136340 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -319,7 +319,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) -null.asInstanceOf[OUT] // exit silently +throw new SparkException("Spark session has been stopped", e) --- End diff -- then shall we just remove this case to be totally consistent with java task? i.e. no logging here and no extra SparkException to wrap the original exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r154990418 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -319,7 +319,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) -null.asInstanceOf[OUT] // exit silently +throw new SparkException("Spark session has been stopped", e) --- End diff -- Python task has the same retry mechanism as Java - when a python task fails, the corresponding java task (EvalPythonExec/FlatMapGroupsInPandasExec) also fails and the java task is retried. I think the python/java behavior would be consistent with this patch - when spark session is stopped, java task will receive an when reading from python task output, and then throw an SparkException. The java task will then be enqueued for retry as which point it will be the same as other java tasks that are enqueued for retry during shutdown. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r154852742 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -319,7 +319,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) -null.asInstanceOf[OUT] // exit silently +throw new SparkException("Spark session has been stopped", e) --- End diff -- It seems like java task doesn't check `env.isStopped` when task failed, and only check it when retry failed. Does python task have retry mechanism? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r154737303 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -319,7 +319,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) -null.asInstanceOf[OUT] // exit silently +throw new SparkException("Spark session has been stopped", e) --- End diff -- @cloud-fan Thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r154122767 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -319,7 +319,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) -null.asInstanceOf[OUT] // exit silently +throw new SparkException("Spark session has been stopped", e) --- End diff -- I checked the code there are two places there `env.isStopped` is used: * PythonRunner will complete silently when env.isStopped = true * When retry failed tasked and env.isStopped, Spark will ignore RejectedExecutionException. As far as java tasks go, during shutdown I don't think there are code that make them exit silently. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r154035525 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -319,7 +319,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) -null.asInstanceOf[OUT] // exit silently +throw new SparkException("Spark session has been stopped", e) --- End diff -- is this behavior consistent with java tasks? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/19852 [SPARK-22655][PySpark] Throw exception rather than exit silently in PythonRunner when Spark ⦠â¦session is stopped ## What changes were proposed in this pull request? We have observed in our production environment that during Spark shutdown, if there are some active tasks, sometimes they will complete with incorrect results. We've tracked down the issue to a PythonRunner where it is returning partial result instead of throwing exception during Spark shutdown. I think the better way to handle this is to have these tasks fail instead of complete with partial results (complete with partial is always bad IMHO) ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark python-runner-shutdown Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19852.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19852 commit 75c8191ff51c0f7802ce592fdecca6f551a60687 Author: Li JinDate: 2017-11-29T22:00:42Z Throw exception rather than exit silently in PythonRunner when Spark session is stopped --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org