[5/6] zeppelin git commit: ZEPPELIN-3375: Make PySparkInterpreter extends PythonInterpreter

2018-06-08 Thread zjffdu
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2f4070df/python/src/main/resources/python/zeppelin_python.py
--
diff --git a/python/src/main/resources/python/zeppelin_python.py 
b/python/src/main/resources/python/zeppelin_python.py
index 0b2d533..19fa220 100644
--- a/python/src/main/resources/python/zeppelin_python.py
+++ b/python/src/main/resources/python/zeppelin_python.py
@@ -15,24 +15,12 @@
 # limitations under the License.
 #
 
-import os, sys, getopt, traceback, json, re
+import os, sys, traceback, json, re
 
 from py4j.java_gateway import java_import, JavaGateway, GatewayClient
-from py4j.protocol import Py4JJavaError, Py4JNetworkError
-import warnings
-import ast
-import traceback
-import warnings
-import signal
-import base64
-
-from io import BytesIO
-try:
-from StringIO import StringIO
-except ImportError:
-from io import StringIO
+from py4j.protocol import Py4JJavaError
 
-# for back compatibility
+import ast
 
 class Logger(object):
   def __init__(self):
@@ -47,46 +35,79 @@ class Logger(object):
   def flush(self):
 pass
 
-def handler_stop_signals(sig, frame):
-  sys.exit("Got signal : " + str(sig))
 
+class PythonCompletion:
+  def __init__(self, interpreter, userNameSpace):
+self.interpreter = interpreter
+self.userNameSpace = userNameSpace
 
-signal.signal(signal.SIGINT, handler_stop_signals)
+  def getObjectCompletion(self, text_value):
+completions = [completion for completion in 
list(self.userNameSpace.keys()) if completion.startswith(text_value)]
+builtinCompletions = [completion for completion in dir(__builtins__) if 
completion.startswith(text_value)]
+return completions + builtinCompletions
 
-host = "127.0.0.1"
-if len(sys.argv) >= 3:
-  host = sys.argv[2]
+  def getMethodCompletion(self, objName, methodName):
+execResult = locals()
+try:
+  exec("{} = dir({})".format("objectDefList", objName), 
_zcUserQueryNameSpace, execResult)
+except:
+  self.interpreter.logPythonOutput("Fail to run dir on " + objName)
+  self.interpreter.logPythonOutput(traceback.format_exc())
+  return None
+else:
+  objectDefList = execResult['objectDefList']
+  return [completion for completion in execResult['objectDefList'] if 
completion.startswith(methodName)]
+
+  def getCompletion(self, text_value):
+if text_value == None:
+  return None
+
+dotPos = text_value.find(".")
+if dotPos == -1:
+  objName = text_value
+  completionList = self.getObjectCompletion(objName)
+else:
+  objName = text_value[:dotPos]
+  methodName = text_value[dotPos + 1:]
+  completionList = self.getMethodCompletion(objName, methodName)
+
+if completionList is None or len(completionList) <= 0:
+  self.interpreter.setStatementsFinished("", False)
+else:
+  result = json.dumps(list(filter(lambda x : not re.match("^__.*", x), 
list(completionList
+  self.interpreter.setStatementsFinished(result, False)
+
+host = sys.argv[1]
+port = int(sys.argv[2])
+
+client = GatewayClient(address=host, port=port)
+gateway = JavaGateway(client, auto_convert = True)
+intp = gateway.entry_point
+# redirect stdout/stderr to java side so that PythonInterpreter can capture 
the python execution result
+output = Logger()
+sys.stdout = output
+sys.stderr = output
 
 _zcUserQueryNameSpace = {}
-client = GatewayClient(address=host, port=int(sys.argv[1]))
-
-gateway = JavaGateway(client)
-
-intp = gateway.entry_point
-intp.onPythonScriptInitialized(os.getpid())
 
-java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
+completion = PythonCompletion(intp, _zcUserQueryNameSpace)
+_zcUserQueryNameSpace["__zeppelin_completion__"] = completion
+_zcUserQueryNameSpace["gateway"] = gateway
 
 from zeppelin_context import PyZeppelinContext
+if intp.getZeppelinContext():
+  z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)
+  __zeppelin__._setup_matplotlib()
+  _zcUserQueryNameSpace["z"] = z
+  _zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__
 
-z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)
-__zeppelin__._setup_matplotlib()
-
-_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__
-_zcUserQueryNameSpace["z"] = z
-
-output = Logger()
-sys.stdout = output
-#sys.stderr = output
+intp.onPythonScriptInitialized(os.getpid())
 
 while True :
   req = intp.getStatements()
-  if req == None:
-break
-
   try:
 stmts = req.statements().split("\n")
-final_code = []
+isForCompletion = req.isForCompletion()
 
 # Get post-execute hooks
 try:
@@ -98,35 +119,23 @@ while True :
   user_hook = __zeppelin__.getHook('post_exec')
 except:
   user_hook = None
-  
-nhooks = 0
-for hook in (global_hook, user_hook):
-  if hook:
-nhooks += 1
 
-for s in stmts:
-  if s == None:
-continue
-
-  # skip comment
-  s_stripped = s.strip()
-  if 

[3/6] zeppelin git commit: ZEPPELIN-3374. Improvement on PySparkInterpreter

2018-06-08 Thread zjffdu
ZEPPELIN-3374. Improvement on PySparkInterpreter

A few improvements on PySparkInterpreter.
1. Refactor PySparkInterpreter to make it more readable
2. Code completion features is totally broken, fix it in this PR
3. Reuse the same test case of IPySparkInterpreter.

[Bug Fix | Improvement | Refactoring]

* [ ] - Task

* https://issues.apache.org/jira/browse/ZEPPELIN-3374

* CI pass

Code completion before

![completion_before](https://user-images.githubusercontent.com/164491/38160504-ee1ea3a8-34f1-11e8-9aab-baf98962aae3.gif)

Code completion after
![completion_after](https://user-images.githubusercontent.com/164491/38160505-eff108b0-34f1-11e8-88eb-03c51cfb96de.gif)

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang 

Closes #2901 from zjffdu/ZEPPELIN-3374 and squashes the following commits:

c22078d [Jeff Zhang] ZEPPELIN-3374. Improvement on PySparkInterpreter

(cherry picked from commit aefc7ea395fed5a93e69942725a19b203a3574e2)
(cherry picked from commit 1bdd0a7d1f51a40d6042779ff0547e2d80001584)


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5550c5fb
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5550c5fb
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5550c5fb

Branch: refs/heads/branch-0.8
Commit: 5550c5fba344d09a432bb289b2438886bdd2d345
Parents: 87d47d6
Author: Jeff Zhang 
Authored: Fri Mar 30 11:12:08 2018 +0800
Committer: Jeff Zhang 
Committed: Fri Jun 8 16:32:19 2018 +0800

--
 .../zeppelin/spark/PySparkInterpreter.java  | 267 +--
 .../main/resources/python/zeppelin_pyspark.py   |  86 +++---
 .../zeppelin/spark/IPySparkInterpreterTest.java | 127 +
 .../zeppelin/spark/PySparkInterpreterTest.java  |  39 +--
 4 files changed, 247 insertions(+), 272 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5550c5fb/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
--
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 2bcf9fe..98b2360 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -41,6 +41,7 @@ import 
org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.InvalidHookException;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.apache.zeppelin.spark.dep.SparkDependencyContext;
@@ -53,11 +54,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.net.MalformedURLException;
-import java.net.ServerSocket;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.LinkedList;
@@ -66,65 +64,26 @@ import java.util.Map;
 import java.util.Properties;
 
 /**
- *
+ *  Interpreter for PySpark, it is the first implementation of interpreter for 
PySpark, so with less
+ *  features compared to IPySparkInterpreter, but requires less prerequisites 
than
+ *  IPySparkInterpreter, only python is required.
  */
 public class PySparkInterpreter extends Interpreter implements 
ExecuteResultHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PySparkInterpreter.class);
+  private static final int MAX_TIMEOUT_SEC = 10;
+
   private GatewayServer gatewayServer;
   private DefaultExecutor executor;
-  private int port;
+  // used to forward output from python process to InterpreterOutput
   private InterpreterOutputStream outputStream;
-  private BufferedWriter ins;
-  private PipedInputStream in;
-  private ByteArrayOutputStream input;
   private String scriptPath;
-  boolean pythonscriptRunning = false;
-  private static final int MAX_TIMEOUT_SEC = 10;
-  private long pythonPid;
-
+  private boolean pythonscriptRunning = false;
+  private long pythonPid = -1;
   private IPySparkInterpreter iPySparkInterpreter;
+  private SparkInterpreter sparkInterpreter;
 
   public PySparkInterpreter(Properties property) {
 super(property);
-
-pythonPid = -1;
-try {
-  File scriptFile = File.createTempFile("zeppelin_pyspark-", 

[4/6] zeppelin git commit: ZEPPELIN-3375: Make PySparkInterpreter extends PythonInterpreter

2018-06-08 Thread zjffdu
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2f4070df/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
--
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 98b2360..095c096 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -17,50 +17,29 @@
 
 package org.apache.zeppelin.spark;
 
-import com.google.gson.Gson;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.DefaultExecutor;
-import org.apache.commons.exec.ExecuteException;
-import org.apache.commons.exec.ExecuteResultHandler;
-import org.apache.commons.exec.ExecuteWatchdog;
-import org.apache.commons.exec.PumpStreamHandler;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
 import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InvalidHookException;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.python.IPythonInterpreter;
+import org.apache.zeppelin.python.PythonInterpreter;
 import org.apache.zeppelin.spark.dep.SparkDependencyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import py4j.GatewayServer;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.PipedInputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -68,56 +47,23 @@ import java.util.Properties;
  *  features compared to IPySparkInterpreter, but requires less prerequisites 
than
  *  IPySparkInterpreter, only python is required.
  */
-public class PySparkInterpreter extends Interpreter implements 
ExecuteResultHandler {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PySparkInterpreter.class);
-  private static final int MAX_TIMEOUT_SEC = 10;
+public class PySparkInterpreter extends PythonInterpreter {
+
+  private static Logger LOGGER = 
LoggerFactory.getLogger(PySparkInterpreter.class);
 
-  private GatewayServer gatewayServer;
-  private DefaultExecutor executor;
-  // used to forward output from python process to InterpreterOutput
-  private InterpreterOutputStream outputStream;
-  private String scriptPath;
-  private boolean pythonscriptRunning = false;
-  private long pythonPid = -1;
-  private IPySparkInterpreter iPySparkInterpreter;
   private SparkInterpreter sparkInterpreter;
 
   public PySparkInterpreter(Properties property) {
 super(property);
+this.useBuiltinPy4j = false;
   }
 
   @Override
   public void open() throws InterpreterException {
-// try IPySparkInterpreter first
-iPySparkInterpreter = getIPySparkInterpreter();
-if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
-StringUtils.isEmpty(
-
iPySparkInterpreter.checkIPythonPrerequisite(getPythonExec(getProperties() {
-  try {
-iPySparkInterpreter.open();
-LOGGER.info("IPython is available, Use IPySparkInterpreter to replace 
PySparkInterpreter");
-return;
-  } catch (Exception e) {
-iPySparkInterpreter = null;
-LOGGER.warn("Fail to open IPySparkInterpreter", e);
-  }
-}
+setProperty("zeppelin.python.useIPython", 
getProperty("zeppelin.pyspark.useIPython", "true"));
 
-// reset iPySparkInterpreter to null as it is not available
-iPySparkInterpreter = null;
-LOGGER.info("IPython is not available, use the native 
PySparkInterpreter\n");
-// Add matplotlib display hook
-InterpreterGroup 

zeppelin git commit: [SECURITY] Secure connection between R process and JVM process

2018-06-08 Thread zjffdu
Repository: zeppelin
Updated Branches:
  refs/heads/master 9fbcacf5c -> b7d98b3f1


[SECURITY] Secure connection between R process and JVM process

(cherry picked from commit 40f1c77f10eb5300e441142e0fa3097b1123b05b)


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b7d98b3f
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b7d98b3f
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b7d98b3f

Branch: refs/heads/master
Commit: b7d98b3f119c7dcdfc7064cd56be8fdb8bbaabd9
Parents: 9fbcacf
Author: Jeff Zhang 
Authored: Mon Jun 4 15:29:45 2018 +0800
Committer: Jeff Zhang 
Committed: Fri Jun 8 16:28:27 2018 +0800

--
 .../zeppelin/spark/SparkRInterpreter.java   | 16 ++---
 .../org/apache/zeppelin/spark/SparkVersion.java |  4 
 .../org/apache/zeppelin/spark/ZeppelinR.java|  5 +++-
 .../src/main/resources/R/zeppelin_sparkr.R  | 12 --
 .../scala/org/apache/spark/SparkRBackend.scala  | 25 +---
 5 files changed, 42 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b7d98b3f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
--
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 896f3a1..6d21450 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -53,6 +53,7 @@ public class SparkRInterpreter extends Interpreter {
   private AtomicBoolean rbackendDead = new AtomicBoolean(false);
   private SparkContext sc;
   private JavaSparkContext jsc;
+  private String secret;
 
   public SparkRInterpreter(Properties property) {
 super(property);
@@ -76,19 +77,18 @@ public class SparkRInterpreter extends Interpreter {
   sparkRLibPath = "sparkr";
 }
 
+this.sparkInterpreter = getSparkInterpreter();
+this.sc = sparkInterpreter.getSparkContext();
+this.jsc = sparkInterpreter.getJavaSparkContext();
+
 // Share the same SparkRBackend across sessions
+SparkVersion sparkVersion = new SparkVersion(sc.version());
 synchronized (SparkRBackend.backend()) {
   if (!SparkRBackend.isStarted()) {
-SparkRBackend.init();
+SparkRBackend.init(sparkVersion);
 SparkRBackend.start();
   }
 }
-
-int port = SparkRBackend.port();
-this.sparkInterpreter = getSparkInterpreter();
-this.sc = sparkInterpreter.getSparkContext();
-this.jsc = sparkInterpreter.getJavaSparkContext();
-SparkVersion sparkVersion = new SparkVersion(sc.version());
 this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0);
 int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 
6000);
 
@@ -100,7 +100,7 @@ public class SparkRInterpreter extends Interpreter {
 ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
 ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
 
-zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion, 
timeout, this);
+zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), 
sparkVersion, timeout, this);
 try {
   zeppelinR.open();
 } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b7d98b3f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
--
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 09ea332..5e412eb 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -34,6 +34,7 @@ public class SparkVersion {
   public static final SparkVersion SPARK_1_6_0 = 
SparkVersion.fromVersionString("1.6.0");
 
   public static final SparkVersion SPARK_2_0_0 = 
SparkVersion.fromVersionString("2.0.0");
+  public static final SparkVersion SPARK_2_3_1 = 
SparkVersion.fromVersionString("2.3.1");
   public static final SparkVersion SPARK_2_4_0 = 
SparkVersion.fromVersionString("2.4.0");
 
   public static final SparkVersion MIN_SUPPORTED_VERSION =  SPARK_1_0_0;
@@ -108,6 +109,9 @@ public class SparkVersion {
 return this.olderThan(SPARK_1_3_0);
   }
 
+  public boolean isSecretSocketSupported() {
+return this.newerThanEquals(SPARK_2_3_1);
+  }
   public boolean equals(Object versionToCompare) {
 return version == 

[zeppelin] Git Push Summary

2018-06-08 Thread zjffdu
Repository: zeppelin
Updated Branches:
  refs/heads/sparkr [deleted] 40f1c77f1