Implement job canceling and progress get

Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f79ff2f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f79ff2f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f79ff2f8

Branch: refs/heads/master
Commit: f79ff2f8ce9432e04467aeef5aa71e98b993136c
Parents: 8db2240
Author: Lee moon soo <[email protected]>
Authored: Thu Mar 12 08:20:18 2015 +0900
Committer: Lee moon soo <[email protected]>
Committed: Thu Mar 12 08:20:18 2015 +0900

----------------------------------------------------------------------
 .../zeppelin/spark/PySparkInterpreter.java      | 86 +++++++++++---------
 .../nflabs/zeppelin/spark/SparkInterpreter.java |  6 +-
 .../main/resources/python/zeppelin_pyspark.py   |  6 +-
 3 files changed, 57 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f79ff2f8/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java
index 9cc1e4c..51c7496 100644
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java
@@ -9,6 +9,7 @@ import java.io.OutputStreamWriter;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.net.ServerSocket;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -186,21 +187,42 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
     gatewayServer.shutdown();
   }
 
-  private String _statements;
+  PythonInterpretRequest pythonInterpretRequest = null;
+
+  /**
+   *
+   */
+  public class PythonInterpretRequest {
+    public String statements;
+    public String jobGroup;
+
+    public PythonInterpretRequest(String statements, String jobGroup) {
+      this.statements = statements;
+      this.jobGroup = jobGroup;
+    }
+
+    public String statements() {
+      return statements;
+    }
+
+    public String jobGroup() {
+      return jobGroup;
+    }
+  }
 
   Integer statementSetNotifier = new Integer(0);
 
-  public String getStatements() {
+  public PythonInterpretRequest getStatements() {
     synchronized (statementSetNotifier) {
-      while (_statements == null) {
+      while (pythonInterpretRequest == null) {
         try {
           statementSetNotifier.wait(1000);
         } catch (InterruptedException e) {
         }
       }
-      String st = _statements;
-      _statements = null;
-      return st;
+      PythonInterpretRequest req = pythonInterpretRequest;
+      pythonInterpretRequest = null;
+      return req;
     }
   }
 
@@ -223,7 +245,10 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
       return new InterpreterResult(Code.ERROR, "python process not running");
     }
 
-    _statements = st;
+    SparkInterpreter sparkInterpreter = getSparkInterpreter();
+    String jobGroup = sparkInterpreter.getJobGroup(context);
+
+    pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup);
     statementOutput = null;
 
     synchronized (statementSetNotifier) {
@@ -239,44 +264,18 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
         }
       }
     }
-    //System.out.println("from python = "+statementFinished);
+
     if (statementError) {
       return new InterpreterResult(Code.ERROR, statementOutput);
     } else {
       return new InterpreterResult(Code.SUCCESS, statementOutput);
     }
-
-    /*
-    //outputStream.reset();
-    try {
-      System.out.println("> is="+in.available()+", "+outputStream.size());
-      input.write((st + "\n").getBytes());
-      input.flush();
-      System.out.println("- is="+in.available()+", "+outputStream.size());
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-
-    }
-    try {
-      System.out.println("< is="+in.available()+", "+outputStream.size());
-    } catch (IOException e) {
-    }
-
-    outputStream.size();
-    String result = outputStream.toString();
-    System.out.println("Result = " + result);
-    logger.info("pyspark result " + result);
-    return new InterpreterResult(Code.SUCCESS, result);
-    */
   }
 
   @Override
   public void cancel(InterpreterContext context) {
-    return;
+    SparkInterpreter sparkInterpreter = getSparkInterpreter();
+    sparkInterpreter.cancel(context);
   }
 
   @Override
@@ -286,12 +285,14 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
 
   @Override
   public int getProgress(InterpreterContext context) {
-    return 0;
+    SparkInterpreter sparkInterpreter = getSparkInterpreter();
+    return sparkInterpreter.getProgress(context);
   }
 
   @Override
   public List<String> completion(String buf, int cursor) {
-    return null;
+    // not supported
+    return new LinkedList<String>();
   }
 
   private SparkInterpreter getSparkInterpreter() {
@@ -313,6 +314,15 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
     return null;
   }
 
+  public ZeppelinContext getZeppelinContext() {
+    SparkInterpreter sparkIntp = getSparkInterpreter();
+    if (sparkIntp != null) {
+      return getSparkInterpreter().getZeppelinContext();
+    } else {
+      return null;
+    }
+  }
+
   public JavaSparkContext getJavaSparkContext() {
     SparkInterpreter intp = getSparkInterpreter();
     if (intp == null) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f79ff2f8/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
index 18f9a79..bc02d0f 100644
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
@@ -436,7 +436,7 @@ public class SparkInterpreter extends Interpreter {
     }
   }
 
-  private String getJobGroup(InterpreterContext context){
+  String getJobGroup(InterpreterContext context){
     return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
   }
 
@@ -657,4 +657,8 @@ public class SparkInterpreter extends Interpreter {
     return SchedulerFactory.singleton().createOrGetFIFOScheduler(
       SparkInterpreter.class.getName() + this.hashCode());
   }
+
+  public ZeppelinContext getZeppelinContext() {
+    return z;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f79ff2f8/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py 
b/spark/src/main/resources/python/zeppelin_pyspark.py
index d68e53f..4362406 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -54,9 +54,10 @@ sys.stdout = output
 sys.stderr = output
 
 while True :
-  st = intp.getStatements()
+  req = intp.getStatements()
   try:
-    stmts = st.split("\n")
+    stmts = req.statements().split("\n")
+    jobGroup = req.jobGroup()
     single = None
     incomplete = None
 
@@ -70,6 +71,7 @@ while True :
         single += "\n" + s
 
       try :
+        sc.setJobGroup(jobGroup, "Zeppelin")
         eval(compile(single, "<String>", "single"))
         single = ""
         incomplete = None

Reply via email to