spark git commit: [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead

2016-09-22 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 243bdb11d -> 47fc0b9f4


[SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is 
dead

## What changes were proposed in this pull request?

When the Python process is dead, the JVM StreamingContext is still running. 
Hence we will see a lot of Py4jException before the JVM process exits. It's 
better to stop the JVM StreamingContext to avoid those annoying logs.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #15201 from zsxwing/stop-jvm-ssc.

(cherry picked from commit 3cdae0ff2f45643df7bc198cb48623526c7eb1a6)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47fc0b9f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47fc0b9f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47fc0b9f

Branch: refs/heads/branch-2.0
Commit: 47fc0b9f40d814bc8e19f86dad591d4aed467222
Parents: 243bdb1
Author: Shixiong Zhu 
Authored: Thu Sep 22 14:26:45 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Sep 22 14:26:53 2016 -0700

--
 .../streaming/api/python/PythonDStream.scala| 33 ++--
 .../streaming/scheduler/JobGenerator.scala  |  2 ++
 .../streaming/scheduler/JobScheduler.scala  |  2 ++
 3 files changed, 35 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index aeff4d7..46bfc60 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList}
 import scala.collection.JavaConverters._
 import scala.language.existentials
 
+import py4j.Py4JException
+
 import org.apache.spark.SparkException
 import org.apache.spark.api.java._
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Interval, Time}
+import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time}
 import org.apache.spark.streaming.api.java._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.util.Utils
@@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer {
 /**
  * Helper functions, which are called from Python via Py4J.
  */
-private[python] object PythonDStream {
+private[streaming] object PythonDStream {
 
   /**
* can not access PythonTransformFunctionSerializer.register() via Py4j
@@ -184,6 +187,32 @@ private[python] object PythonDStream {
 rdds.asScala.foreach(queue.add)
 queue
   }
+
+  /**
+   * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in 
case the user cannot
+   * stop it in the Python side.
+   */
+  def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = {
+// These two special messages are from:
+// scalastyle:off
+// 
https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218
+// 
https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340
+// scalastyle:on
+if (e.isInstanceOf[Py4JException] &&
+  ("Cannot obtain a new communication channel" == e.getMessage ||
+"Error while obtaining a new communication channel" == e.getMessage)) {
+  // Start a new thread to stop StreamingContext to avoid deadlock.
+  new Thread("Stop-StreamingContext") with Logging {
+setDaemon(true)
+
+override def run(): Unit = {
+  logError(
+"Cannot connect to Python process. It's probably dead. Stopping 
StreamingContext.", e)
+  StreamingContext.getActive().foreach(_.stop(stopSparkContext = 
false))
+}
+  }.start()
+}
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 19c88f1..4489a53 100644
--- 

spark git commit: [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead

2016-09-22 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 85d609cf2 -> 3cdae0ff2


[SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is 
dead

## What changes were proposed in this pull request?

When the Python process is dead, the JVM StreamingContext is still running. 
Hence we will see a lot of Py4jException before the JVM process exits. It's 
better to stop the JVM StreamingContext to avoid those annoying logs.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #15201 from zsxwing/stop-jvm-ssc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cdae0ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cdae0ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cdae0ff

Branch: refs/heads/master
Commit: 3cdae0ff2f45643df7bc198cb48623526c7eb1a6
Parents: 85d609c
Author: Shixiong Zhu 
Authored: Thu Sep 22 14:26:45 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Sep 22 14:26:45 2016 -0700

--
 .../streaming/api/python/PythonDStream.scala| 33 ++--
 .../streaming/scheduler/JobGenerator.scala  |  2 ++
 .../streaming/scheduler/JobScheduler.scala  |  2 ++
 3 files changed, 35 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3cdae0ff/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index aeff4d7..46bfc60 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList}
 import scala.collection.JavaConverters._
 import scala.language.existentials
 
+import py4j.Py4JException
+
 import org.apache.spark.SparkException
 import org.apache.spark.api.java._
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Interval, Time}
+import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time}
 import org.apache.spark.streaming.api.java._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.util.Utils
@@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer {
 /**
  * Helper functions, which are called from Python via Py4J.
  */
-private[python] object PythonDStream {
+private[streaming] object PythonDStream {
 
   /**
* can not access PythonTransformFunctionSerializer.register() via Py4j
@@ -184,6 +187,32 @@ private[python] object PythonDStream {
 rdds.asScala.foreach(queue.add)
 queue
   }
+
+  /**
+   * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in 
case the user cannot
+   * stop it in the Python side.
+   */
+  def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = {
+// These two special messages are from:
+// scalastyle:off
+// 
https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218
+// 
https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340
+// scalastyle:on
+if (e.isInstanceOf[Py4JException] &&
+  ("Cannot obtain a new communication channel" == e.getMessage ||
+"Error while obtaining a new communication channel" == e.getMessage)) {
+  // Start a new thread to stop StreamingContext to avoid deadlock.
+  new Thread("Stop-StreamingContext") with Logging {
+setDaemon(true)
+
+override def run(): Unit = {
+  logError(
+"Cannot connect to Python process. It's probably dead. Stopping 
StreamingContext.", e)
+  StreamingContext.getActive().foreach(_.stop(stopSparkContext = 
false))
+}
+  }.start()
+}
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3cdae0ff/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 10d64f9..8d83dc8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++