Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master f11cdf699 -> 031ae9eb0


ZEPPELIN-22 PySparkInterpreter hanging without error message

When something goes wrong, like misconfiguring spark.home property, %pyspark is 
hanging.
This PR makes Zeppelin prints some error instead of waiting forever.

Here's example of error message printed when it failed to load py4j package. 
Previously it was just hanging.
![image](https://cloud.githubusercontent.com/assets/1540981/6978258/9e10caa8-da09-11e4-82da-80f935502f5b.png)

Author: Lee moon soo <[email protected]>

Closes #26 from Leemoonsoo/ZEPPELIN-22 and squashes the following commits:

5910825 [Lee moon soo] ZEPPELIN-22 handle pyspark initialize error and print 
error message


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/031ae9eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/031ae9eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/031ae9eb

Branch: refs/heads/master
Commit: 031ae9eb0cffca752ab5d1483a94e25234697857
Parents: f11cdf6
Author: Lee moon soo <[email protected]>
Authored: Fri Apr 3 13:54:28 2015 +0900
Committer: Lee moon soo <[email protected]>
Committed: Sun Apr 5 22:00:34 2015 +0900

----------------------------------------------------------------------
 .../zeppelin/spark/PySparkInterpreter.java      | 38 +++++++++++++++++++-
 .../main/resources/python/zeppelin_pyspark.py   |  4 +--
 2 files changed, 39 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/031ae9eb/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java
index 36cdcae..f09667d 100644
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java
@@ -238,10 +238,46 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
 
   }
 
+  boolean pythonScriptInitialized = false;
+  Integer pythonScriptInitializeNotifier = new Integer(0);
+
+  public void onPythonScriptInitialized() {
+    synchronized (pythonScriptInitializeNotifier) {
+      pythonScriptInitialized = true;
+      pythonScriptInitializeNotifier.notifyAll();
+    }
+  }
+
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
     if (!pythonscriptRunning) {
-      return new InterpreterResult(Code.ERROR, "python process not running");
+      return new InterpreterResult(Code.ERROR, "python process not running"
+          + outputStream.toString());
+    }
+
+    outputStream.reset();
+
+    synchronized (pythonScriptInitializeNotifier) {
+      long startTime = System.currentTimeMillis();
+      while (pythonScriptInitialized == false
+          && pythonscriptRunning
+          && System.currentTimeMillis() - startTime < 10 * 1000) {
+        try {
+          pythonScriptInitializeNotifier.wait(1000);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+
+    if (pythonscriptRunning == false) {
+      // python script failed to initialize and terminated
+      return new InterpreterResult(Code.ERROR, "failed to start pyspark"
+          + outputStream.toString());
+    }
+    if (pythonScriptInitialized == false) {
+      // timeout. didn't get initialized message
+      return new InterpreterResult(Code.ERROR, "pyspark is not responding "
+          + outputStream.toString());
     }
 
     SparkInterpreter sparkInterpreter = getSparkInterpreter();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/031ae9eb/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py 
b/spark/src/main/resources/python/zeppelin_pyspark.py
index b822c0d..92baf58 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -23,6 +23,8 @@ java_import(gateway.jvm, "org.apache.spark.api.python.*")
 java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
 
 intp = gateway.entry_point
+intp.onPythonScriptInitialized()
+
 jsc = intp.getJavaSparkContext()
 
 if jsc.version().startswith("1.2"):
@@ -37,7 +39,6 @@ elif jsc.version().startswith("1.3"):
 
 java_import(gateway.jvm, "scala.Tuple2")
 
-
 jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
@@ -62,7 +63,6 @@ output = Logger()
 sys.stdout = output
 sys.stderr = output
 
-
 while True :
   req = intp.getStatements()
   try:

Reply via email to