Repository: incubator-zeppelin Updated Branches: refs/heads/master f11cdf699 -> 031ae9eb0
ZEPPELIN-22 PySparkInterpreter hanging without error message When something goes wrong, like misconfiguring spark.home property, %pyspark is hanging. This PR makes Zeppelin prints some error instead of waiting forever. Here's example of error message printed when it failed to load py4j package. Previously it was just hanging.  Author: Lee moon soo <[email protected]> Closes #26 from Leemoonsoo/ZEPPELIN-22 and squashes the following commits: 5910825 [Lee moon soo] ZEPPELIN-22 handle pyspark initialize error and print error message Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/031ae9eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/031ae9eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/031ae9eb Branch: refs/heads/master Commit: 031ae9eb0cffca752ab5d1483a94e25234697857 Parents: f11cdf6 Author: Lee moon soo <[email protected]> Authored: Fri Apr 3 13:54:28 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Sun Apr 5 22:00:34 2015 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 38 +++++++++++++++++++- .../main/resources/python/zeppelin_pyspark.py | 4 +-- 2 files changed, 39 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/031ae9eb/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java index 36cdcae..f09667d 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java @@ -238,10 +238,46 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } + boolean pythonScriptInitialized = false; + Integer pythonScriptInitializeNotifier = new Integer(0); + + public void onPythonScriptInitialized() { + synchronized (pythonScriptInitializeNotifier) { + pythonScriptInitialized = true; + pythonScriptInitializeNotifier.notifyAll(); + } + } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { if (!pythonscriptRunning) { - return new InterpreterResult(Code.ERROR, "python process not running"); + return new InterpreterResult(Code.ERROR, "python process not running" + + outputStream.toString()); + } + + outputStream.reset(); + + synchronized (pythonScriptInitializeNotifier) { + long startTime = System.currentTimeMillis(); + while (pythonScriptInitialized == false + && pythonscriptRunning + && System.currentTimeMillis() - startTime < 10 * 1000) { + try { + pythonScriptInitializeNotifier.wait(1000); + } catch (InterruptedException e) { + } + } + } + + if (pythonscriptRunning == false) { + // python script failed to initialize and terminated + return new InterpreterResult(Code.ERROR, "failed to start pyspark" + + outputStream.toString()); + } + if (pythonScriptInitialized == false) { + // timeout. didn't get initialized message + return new InterpreterResult(Code.ERROR, "pyspark is not responding " + + outputStream.toString()); } SparkInterpreter sparkInterpreter = getSparkInterpreter(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/031ae9eb/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index b822c0d..92baf58 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -23,6 +23,8 @@ java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") intp = gateway.entry_point +intp.onPythonScriptInitialized() + jsc = intp.getJavaSparkContext() if jsc.version().startswith("1.2"): @@ -37,7 +39,6 @@ elif jsc.version().startswith("1.3"): java_import(gateway.jvm, "scala.Tuple2") - jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) @@ -62,7 +63,6 @@ output = Logger() sys.stdout = output sys.stderr = output - while True : req = intp.getStatements() try:
