Implement job canceling and progress get
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f79ff2f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f79ff2f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f79ff2f8 Branch: refs/heads/master Commit: f79ff2f8ce9432e04467aeef5aa71e98b993136c Parents: 8db2240 Author: Lee moon soo <[email protected]> Authored: Thu Mar 12 08:20:18 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Thu Mar 12 08:20:18 2015 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 86 +++++++++++--------- .../nflabs/zeppelin/spark/SparkInterpreter.java | 6 +- .../main/resources/python/zeppelin_pyspark.py | 6 +- 3 files changed, 57 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f79ff2f8/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java index 9cc1e4c..51c7496 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java @@ -9,6 +9,7 @@ import java.io.OutputStreamWriter; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.net.ServerSocket; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -186,21 +187,42 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand gatewayServer.shutdown(); } - private String _statements; + PythonInterpretRequest pythonInterpretRequest = null; + + /** + * + */ + public class PythonInterpretRequest { + public String statements; + public String jobGroup; + + public PythonInterpretRequest(String statements, String jobGroup) { + this.statements = statements; + this.jobGroup = jobGroup; + } + + public String statements() { + return statements; + } + + public String jobGroup() { + return jobGroup; + } + } Integer statementSetNotifier = new Integer(0); - public String getStatements() { + public PythonInterpretRequest getStatements() { synchronized (statementSetNotifier) { - while (_statements == null) { + while (pythonInterpretRequest == null) { try { statementSetNotifier.wait(1000); } catch (InterruptedException e) { } } - String st = _statements; - _statements = null; - return st; + PythonInterpretRequest req = pythonInterpretRequest; + pythonInterpretRequest = null; + return req; } } @@ -223,7 +245,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return new InterpreterResult(Code.ERROR, "python process not running"); } - _statements = st; + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + String jobGroup = sparkInterpreter.getJobGroup(context); + + pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup); statementOutput = null; synchronized (statementSetNotifier) { @@ -239,44 +264,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } } - //System.out.println("from python = "+statementFinished); + if (statementError) { return new InterpreterResult(Code.ERROR, statementOutput); } else { return new InterpreterResult(Code.SUCCESS, statementOutput); } - - /* - //outputStream.reset(); - try { - System.out.println("> is="+in.available()+", "+outputStream.size()); - input.write((st + "\n").getBytes()); - input.flush(); - System.out.println("- is="+in.available()+", "+outputStream.size()); - } catch (IOException e) { - throw new InterpreterException(e); - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - - } - try { - System.out.println("< is="+in.available()+", "+outputStream.size()); - } catch (IOException e) { - } - - outputStream.size(); - String result = outputStream.toString(); - System.out.println("Result = " + result); - logger.info("pyspark result " + result); - return new InterpreterResult(Code.SUCCESS, result); - */ } @Override public void cancel(InterpreterContext context) { - return; + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + sparkInterpreter.cancel(context); } @Override @@ -286,12 +285,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public int getProgress(InterpreterContext context) { - return 0; + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + return sparkInterpreter.getProgress(context); } @Override public List<String> completion(String buf, int cursor) { - return null; + // not supported + return new LinkedList<String>(); } private SparkInterpreter getSparkInterpreter() { @@ -313,6 +314,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return null; } + public ZeppelinContext getZeppelinContext() { + SparkInterpreter sparkIntp = getSparkInterpreter(); + if (sparkIntp != null) { + return getSparkInterpreter().getZeppelinContext(); + } else { + return null; + } + } + public JavaSparkContext getJavaSparkContext() { SparkInterpreter intp = getSparkInterpreter(); if (intp == null) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f79ff2f8/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java index 18f9a79..bc02d0f 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java @@ -436,7 +436,7 @@ public class SparkInterpreter extends Interpreter { } } - private String getJobGroup(InterpreterContext context){ + String getJobGroup(InterpreterContext context){ return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId(); } @@ -657,4 +657,8 @@ public class SparkInterpreter extends Interpreter { return SchedulerFactory.singleton().createOrGetFIFOScheduler( SparkInterpreter.class.getName() + this.hashCode()); } + + public ZeppelinContext getZeppelinContext() { + return z; + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f79ff2f8/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index d68e53f..4362406 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -54,9 +54,10 @@ sys.stdout = output sys.stderr = output while True : - st = intp.getStatements() + req = intp.getStatements() try: - stmts = st.split("\n") + stmts = req.statements().split("\n") + jobGroup = req.jobGroup() single = None incomplete = None @@ -70,6 +71,7 @@ while True : single += "\n" + s try : + sc.setJobGroup(jobGroup, "Zeppelin") eval(compile(single, "<String>", "single")) single = "" incomplete = None
