[5/6] zeppelin git commit: ZEPPELIN-3375: Make PySparkInterpreter extends PythonInterpreter
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2f4070df/python/src/main/resources/python/zeppelin_python.py -- diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py index 0b2d533..19fa220 100644 --- a/python/src/main/resources/python/zeppelin_python.py +++ b/python/src/main/resources/python/zeppelin_python.py @@ -15,24 +15,12 @@ # limitations under the License. # -import os, sys, getopt, traceback, json, re +import os, sys, traceback, json, re from py4j.java_gateway import java_import, JavaGateway, GatewayClient -from py4j.protocol import Py4JJavaError, Py4JNetworkError -import warnings -import ast -import traceback -import warnings -import signal -import base64 - -from io import BytesIO -try: -from StringIO import StringIO -except ImportError: -from io import StringIO +from py4j.protocol import Py4JJavaError -# for back compatibility +import ast class Logger(object): def __init__(self): @@ -47,46 +35,79 @@ class Logger(object): def flush(self): pass -def handler_stop_signals(sig, frame): - sys.exit("Got signal : " + str(sig)) +class PythonCompletion: + def __init__(self, interpreter, userNameSpace): +self.interpreter = interpreter +self.userNameSpace = userNameSpace -signal.signal(signal.SIGINT, handler_stop_signals) + def getObjectCompletion(self, text_value): +completions = [completion for completion in list(self.userNameSpace.keys()) if completion.startswith(text_value)] +builtinCompletions = [completion for completion in dir(__builtins__) if completion.startswith(text_value)] +return completions + builtinCompletions -host = "127.0.0.1" -if len(sys.argv) >= 3: - host = sys.argv[2] + def getMethodCompletion(self, objName, methodName): +execResult = locals() +try: + exec("{} = dir({})".format("objectDefList", objName), _zcUserQueryNameSpace, execResult) +except: + self.interpreter.logPythonOutput("Fail to run dir on " + objName) + self.interpreter.logPythonOutput(traceback.format_exc()) + return None +else: + objectDefList = execResult['objectDefList'] + return [completion for completion in execResult['objectDefList'] if completion.startswith(methodName)] + + def getCompletion(self, text_value): +if text_value == None: + return None + +dotPos = text_value.find(".") +if dotPos == -1: + objName = text_value + completionList = self.getObjectCompletion(objName) +else: + objName = text_value[:dotPos] + methodName = text_value[dotPos + 1:] + completionList = self.getMethodCompletion(objName, methodName) + +if completionList is None or len(completionList) <= 0: + self.interpreter.setStatementsFinished("", False) +else: + result = json.dumps(list(filter(lambda x : not re.match("^__.*", x), list(completionList + self.interpreter.setStatementsFinished(result, False) + +host = sys.argv[1] +port = int(sys.argv[2]) + +client = GatewayClient(address=host, port=port) +gateway = JavaGateway(client, auto_convert = True) +intp = gateway.entry_point +# redirect stdout/stderr to java side so that PythonInterpreter can capture the python execution result +output = Logger() +sys.stdout = output +sys.stderr = output _zcUserQueryNameSpace = {} -client = GatewayClient(address=host, port=int(sys.argv[1])) - -gateway = JavaGateway(client) - -intp = gateway.entry_point -intp.onPythonScriptInitialized(os.getpid()) -java_import(gateway.jvm, "org.apache.zeppelin.display.Input") +completion = PythonCompletion(intp, _zcUserQueryNameSpace) +_zcUserQueryNameSpace["__zeppelin_completion__"] = completion +_zcUserQueryNameSpace["gateway"] = gateway from zeppelin_context import PyZeppelinContext +if intp.getZeppelinContext(): + z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway) + __zeppelin__._setup_matplotlib() + _zcUserQueryNameSpace["z"] = z + _zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ -z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway) -__zeppelin__._setup_matplotlib() - -_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ -_zcUserQueryNameSpace["z"] = z - -output = Logger() -sys.stdout = output -#sys.stderr = output +intp.onPythonScriptInitialized(os.getpid()) while True : req = intp.getStatements() - if req == None: -break - try: stmts = req.statements().split("\n") -final_code = [] +isForCompletion = req.isForCompletion() # Get post-execute hooks try: @@ -98,35 +119,23 @@ while True : user_hook = __zeppelin__.getHook('post_exec') except: user_hook = None - -nhooks = 0 -for hook in (global_hook, user_hook): - if hook: -nhooks += 1 -for s in stmts: - if s == None: -continue - - # skip comment - s_stripped = s.strip() - if
[3/6] zeppelin git commit: ZEPPELIN-3374. Improvement on PySparkInterpreter
ZEPPELIN-3374. Improvement on PySparkInterpreter A few improvements on PySparkInterpreter. 1. Refactor PySparkInterpreter to make it more readable 2. Code completion features is totally broken, fix it in this PR 3. Reuse the same test case of IPySparkInterpreter. [Bug Fix | Improvement | Refactoring] * [ ] - Task * https://issues.apache.org/jira/browse/ZEPPELIN-3374 * CI pass Code completion before ![completion_before](https://user-images.githubusercontent.com/164491/38160504-ee1ea3a8-34f1-11e8-9aab-baf98962aae3.gif) Code completion after ![completion_after](https://user-images.githubusercontent.com/164491/38160505-eff108b0-34f1-11e8-88eb-03c51cfb96de.gif) * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang Closes #2901 from zjffdu/ZEPPELIN-3374 and squashes the following commits: c22078d [Jeff Zhang] ZEPPELIN-3374. Improvement on PySparkInterpreter (cherry picked from commit aefc7ea395fed5a93e69942725a19b203a3574e2) (cherry picked from commit 1bdd0a7d1f51a40d6042779ff0547e2d80001584) Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5550c5fb Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5550c5fb Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5550c5fb Branch: refs/heads/branch-0.8 Commit: 5550c5fba344d09a432bb289b2438886bdd2d345 Parents: 87d47d6 Author: Jeff Zhang Authored: Fri Mar 30 11:12:08 2018 +0800 Committer: Jeff Zhang Committed: Fri Jun 8 16:32:19 2018 +0800 -- .../zeppelin/spark/PySparkInterpreter.java | 267 +-- .../main/resources/python/zeppelin_pyspark.py | 86 +++--- .../zeppelin/spark/IPySparkInterpreterTest.java | 127 + .../zeppelin/spark/PySparkInterpreterTest.java | 39 +-- 4 files changed, 247 insertions(+), 272 deletions(-) -- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5550c5fb/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java -- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 2bcf9fe..98b2360 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -41,6 +41,7 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InvalidHookException; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.spark.dep.SparkDependencyContext; @@ -53,11 +54,8 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.net.MalformedURLException; -import java.net.ServerSocket; import java.net.URL; import java.net.URLClassLoader; import java.util.LinkedList; @@ -66,65 +64,26 @@ import java.util.Map; import java.util.Properties; /** - * + * Interpreter for PySpark, it is the first implementation of interpreter for PySpark, so with less + * features compared to IPySparkInterpreter, but requires less prerequisites than + * IPySparkInterpreter, only python is required. */ public class PySparkInterpreter extends Interpreter implements ExecuteResultHandler { private static final Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class); + private static final int MAX_TIMEOUT_SEC = 10; + private GatewayServer gatewayServer; private DefaultExecutor executor; - private int port; + // used to forward output from python process to InterpreterOutput private InterpreterOutputStream outputStream; - private BufferedWriter ins; - private PipedInputStream in; - private ByteArrayOutputStream input; private String scriptPath; - boolean pythonscriptRunning = false; - private static final int MAX_TIMEOUT_SEC = 10; - private long pythonPid; - + private boolean pythonscriptRunning = false; + private long pythonPid = -1; private IPySparkInterpreter iPySparkInterpreter; + private SparkInterpreter sparkInterpreter; public PySparkInterpreter(Properties property) { super(property); - -pythonPid = -1; -try { - File scriptFile = File.createTempFile("zeppelin_pyspark-",
[4/6] zeppelin git commit: ZEPPELIN-3375: Make PySparkInterpreter extends PythonInterpreter
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2f4070df/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java -- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 98b2360..095c096 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -17,50 +17,29 @@ package org.apache.zeppelin.spark; -import com.google.gson.Gson; -import org.apache.commons.compress.utils.IOUtils; -import org.apache.commons.exec.CommandLine; -import org.apache.commons.exec.DefaultExecutor; -import org.apache.commons.exec.ExecuteException; -import org.apache.commons.exec.ExecuteResultHandler; -import org.apache.commons.exec.ExecuteWatchdog; -import org.apache.commons.exec.PumpStreamHandler; -import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.commons.lang.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.InterpreterResultMessage; -import org.apache.zeppelin.interpreter.InvalidHookException; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; +import org.apache.zeppelin.python.IPythonInterpreter; +import org.apache.zeppelin.python.PythonInterpreter; import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import py4j.GatewayServer; -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.PipedInputStream; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Properties; /** @@ -68,56 +47,23 @@ import java.util.Properties; * features compared to IPySparkInterpreter, but requires less prerequisites than * IPySparkInterpreter, only python is required. */ -public class PySparkInterpreter extends Interpreter implements ExecuteResultHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class); - private static final int MAX_TIMEOUT_SEC = 10; +public class PySparkInterpreter extends PythonInterpreter { + + private static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class); - private GatewayServer gatewayServer; - private DefaultExecutor executor; - // used to forward output from python process to InterpreterOutput - private InterpreterOutputStream outputStream; - private String scriptPath; - private boolean pythonscriptRunning = false; - private long pythonPid = -1; - private IPySparkInterpreter iPySparkInterpreter; private SparkInterpreter sparkInterpreter; public PySparkInterpreter(Properties property) { super(property); +this.useBuiltinPy4j = false; } @Override public void open() throws InterpreterException { -// try IPySparkInterpreter first -iPySparkInterpreter = getIPySparkInterpreter(); -if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true") && -StringUtils.isEmpty( - iPySparkInterpreter.checkIPythonPrerequisite(getPythonExec(getProperties() { - try { -iPySparkInterpreter.open(); -LOGGER.info("IPython is available, Use IPySparkInterpreter to replace PySparkInterpreter"); -return; - } catch (Exception e) { -iPySparkInterpreter = null; -LOGGER.warn("Fail to open IPySparkInterpreter", e); - } -} +setProperty("zeppelin.python.useIPython", getProperty("zeppelin.pyspark.useIPython", "true")); -// reset iPySparkInterpreter to null as it is not available -iPySparkInterpreter = null; -LOGGER.info("IPython is not available, use the native PySparkInterpreter\n"); -// Add matplotlib display hook -InterpreterGroup
zeppelin git commit: [SECURITY] Secure connection between R process and JVM process
Repository: zeppelin Updated Branches: refs/heads/master 9fbcacf5c -> b7d98b3f1 [SECURITY] Secure connection between R process and JVM process (cherry picked from commit 40f1c77f10eb5300e441142e0fa3097b1123b05b) Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b7d98b3f Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b7d98b3f Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b7d98b3f Branch: refs/heads/master Commit: b7d98b3f119c7dcdfc7064cd56be8fdb8bbaabd9 Parents: 9fbcacf Author: Jeff Zhang Authored: Mon Jun 4 15:29:45 2018 +0800 Committer: Jeff Zhang Committed: Fri Jun 8 16:28:27 2018 +0800 -- .../zeppelin/spark/SparkRInterpreter.java | 16 ++--- .../org/apache/zeppelin/spark/SparkVersion.java | 4 .../org/apache/zeppelin/spark/ZeppelinR.java| 5 +++- .../src/main/resources/R/zeppelin_sparkr.R | 12 -- .../scala/org/apache/spark/SparkRBackend.scala | 25 +--- 5 files changed, 42 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b7d98b3f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java -- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 896f3a1..6d21450 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -53,6 +53,7 @@ public class SparkRInterpreter extends Interpreter { private AtomicBoolean rbackendDead = new AtomicBoolean(false); private SparkContext sc; private JavaSparkContext jsc; + private String secret; public SparkRInterpreter(Properties property) { super(property); @@ -76,19 +77,18 @@ public class SparkRInterpreter extends Interpreter { sparkRLibPath = "sparkr"; } +this.sparkInterpreter = getSparkInterpreter(); +this.sc = sparkInterpreter.getSparkContext(); +this.jsc = sparkInterpreter.getJavaSparkContext(); + // Share the same SparkRBackend across sessions +SparkVersion sparkVersion = new SparkVersion(sc.version()); synchronized (SparkRBackend.backend()) { if (!SparkRBackend.isStarted()) { -SparkRBackend.init(); +SparkRBackend.init(sparkVersion); SparkRBackend.start(); } } - -int port = SparkRBackend.port(); -this.sparkInterpreter = getSparkInterpreter(); -this.sc = sparkInterpreter.getSparkContext(); -this.jsc = sparkInterpreter.getJavaSparkContext(); -SparkVersion sparkVersion = new SparkVersion(sc.version()); this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0); int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 6000); @@ -100,7 +100,7 @@ public class SparkRInterpreter extends Interpreter { ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext()); -zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion, timeout, this); +zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this); try { zeppelinR.open(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b7d98b3f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java -- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java index 09ea332..5e412eb 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -34,6 +34,7 @@ public class SparkVersion { public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0"); public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0"); + public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1"); public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0"); public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_0_0; @@ -108,6 +109,9 @@ public class SparkVersion { return this.olderThan(SPARK_1_3_0); } + public boolean isSecretSocketSupported() { +return this.newerThanEquals(SPARK_2_3_1); + } public boolean equals(Object versionToCompare) { return version ==
[zeppelin] Git Push Summary
Repository: zeppelin Updated Branches: refs/heads/sparkr [deleted] 40f1c77f1