spark git commit: [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead
Repository: spark Updated Branches: refs/heads/branch-2.0 243bdb11d -> 47fc0b9f4 [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead ## What changes were proposed in this pull request? When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #15201 from zsxwing/stop-jvm-ssc. (cherry picked from commit 3cdae0ff2f45643df7bc198cb48623526c7eb1a6) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47fc0b9f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47fc0b9f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47fc0b9f Branch: refs/heads/branch-2.0 Commit: 47fc0b9f40d814bc8e19f86dad591d4aed467222 Parents: 243bdb1 Author: Shixiong Zhu Authored: Thu Sep 22 14:26:45 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 14:26:53 2016 -0700 -- .../streaming/api/python/PythonDStream.scala| 33 ++-- .../streaming/scheduler/JobGenerator.scala | 2 ++ .../streaming/scheduler/JobScheduler.scala | 2 ++ 3 files changed, 35 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index aeff4d7..46bfc60 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList} import scala.collection.JavaConverters._ import scala.language.existentials +import py4j.Py4JException + import org.apache.spark.SparkException import org.apache.spark.api.java._ +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Interval, Time} +import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time} import org.apache.spark.streaming.api.java._ import org.apache.spark.streaming.dstream._ import org.apache.spark.util.Utils @@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer { /** * Helper functions, which are called from Python via Py4J. */ -private[python] object PythonDStream { +private[streaming] object PythonDStream { /** * can not access PythonTransformFunctionSerializer.register() via Py4j @@ -184,6 +187,32 @@ private[python] object PythonDStream { rdds.asScala.foreach(queue.add) queue } + + /** + * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot + * stop it in the Python side. + */ + def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = { +// These two special messages are from: +// scalastyle:off +// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218 +// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340 +// scalastyle:on +if (e.isInstanceOf[Py4JException] && + ("Cannot obtain a new communication channel" == e.getMessage || +"Error while obtaining a new communication channel" == e.getMessage)) { + // Start a new thread to stop StreamingContext to avoid deadlock. + new Thread("Stop-StreamingContext") with Logging { +setDaemon(true) + +override def run(): Unit = { + logError( +"Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e) + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) +} + }.start() +} + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 19c88f1..4489a53 100644 ---
spark git commit: [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead
Repository: spark Updated Branches: refs/heads/master 85d609cf2 -> 3cdae0ff2 [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead ## What changes were proposed in this pull request? When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #15201 from zsxwing/stop-jvm-ssc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cdae0ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cdae0ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cdae0ff Branch: refs/heads/master Commit: 3cdae0ff2f45643df7bc198cb48623526c7eb1a6 Parents: 85d609c Author: Shixiong Zhu Authored: Thu Sep 22 14:26:45 2016 -0700 Committer: Shixiong Zhu Committed: Thu Sep 22 14:26:45 2016 -0700 -- .../streaming/api/python/PythonDStream.scala| 33 ++-- .../streaming/scheduler/JobGenerator.scala | 2 ++ .../streaming/scheduler/JobScheduler.scala | 2 ++ 3 files changed, 35 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3cdae0ff/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index aeff4d7..46bfc60 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList} import scala.collection.JavaConverters._ import scala.language.existentials +import py4j.Py4JException + import org.apache.spark.SparkException import org.apache.spark.api.java._ +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Interval, Time} +import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time} import org.apache.spark.streaming.api.java._ import org.apache.spark.streaming.dstream._ import org.apache.spark.util.Utils @@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer { /** * Helper functions, which are called from Python via Py4J. */ -private[python] object PythonDStream { +private[streaming] object PythonDStream { /** * can not access PythonTransformFunctionSerializer.register() via Py4j @@ -184,6 +187,32 @@ private[python] object PythonDStream { rdds.asScala.foreach(queue.add) queue } + + /** + * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot + * stop it in the Python side. + */ + def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = { +// These two special messages are from: +// scalastyle:off +// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218 +// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340 +// scalastyle:on +if (e.isInstanceOf[Py4JException] && + ("Cannot obtain a new communication channel" == e.getMessage || +"Error while obtaining a new communication channel" == e.getMessage)) { + // Start a new thread to stop StreamingContext to avoid deadlock. + new Thread("Stop-StreamingContext") with Logging { +setDaemon(true) + +override def run(): Unit = { + logError( +"Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e) + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) +} + }.start() +} + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/3cdae0ff/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 10d64f9..8d83dc8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++