Repository: zeppelin Updated Branches: refs/heads/master a1e69add4 -> 5f88452d6
ZEPPELIN-3362. Unify ZeppelinContext of PythonInterpreter & IPythonInterpreter ### What is this PR for? Unify the ZeppelinContext of PythonInterpreter & IPythonInterpreter into one file to avoid code duplication. ### What type of PR is it? [Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3362 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2890 from zjffdu/ZEPPELIN-3362 and squashes the following commits: b5dcbc9 [Jeff Zhang] ZEPPELIN-3362. Unify ZeppelinContext of PythonInterpreter & IPythonInterpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5f88452d Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5f88452d Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5f88452d Branch: refs/heads/master Commit: 5f88452d63f9b6d7ad00328b1c5a0619e47cbc3c Parents: a1e69ad Author: Jeff Zhang <zjf...@apache.org> Authored: Fri Mar 23 17:23:59 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Sun Mar 25 14:42:21 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/python/IPythonInterpreter.java | 16 ++ .../zeppelin/python/PythonInterpreter.java | 9 +- .../resources/grpc/python/zeppelin_python.py | 122 ---------- .../main/resources/python/zeppelin_context.py | 224 +++++++++++++++++++ .../main/resources/python/zeppelin_python.py | 182 +-------------- .../zeppelin/python/IPythonInterpreterTest.java | 10 +- python/src/test/resources/log4j.properties | 1 + .../zeppelin/spark/PySparkInterpreter.java | 10 + .../main/resources/python/zeppelin_ipyspark.py | 6 +- .../main/resources/python/zeppelin_pyspark.py | 168 ++------------ .../apache/zeppelin/plugin/PluginManager.java | 3 +- 11 files changed, 289 insertions(+), 462 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java index 5c5bfe3..4fe50ee 100644 --- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java @@ -210,6 +210,22 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand throw new IOException("Fail to setup JVMGateway\n" + response.getOutput()); } + input = + getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py"); + lines = IOUtils.readLines(input); + response = ipythonClient.block_execute(ExecuteRequest.newBuilder() + .setCode(StringUtils.join(lines, System.lineSeparator())).build()); + if (response.getStatus() == ExecuteStatus.ERROR) { + throw new IOException("Fail to import ZeppelinContext\n" + response.getOutput()); + } + + response = ipythonClient.block_execute(ExecuteRequest.newBuilder() + .setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)") + .build()); + if (response.getStatus() == ExecuteStatus.ERROR) { + throw new IOException("Fail to setup ZeppelinContext\n" + response.getOutput()); + } + if (additionalPythonInitFile != null) { input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile); lines = IOUtils.readLines(input); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index fab2ed9..178f79a 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -65,6 +65,7 @@ import py4j.GatewayServer; public class PythonInterpreter extends Interpreter implements ExecuteResultHandler { private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class); public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py"; + public static final String ZEPPELIN_CONTEXT = "python/zeppelin_context.py"; public static final String ZEPPELIN_PY4JPATH = "interpreter/python/py4j-0.9.2/src"; public static final String ZEPPELIN_PYTHON_LIBS = "interpreter/lib/python"; public static final String DEFAULT_ZEPPELIN_PYTHON = "python"; @@ -125,7 +126,11 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl } copyFile(out, ZEPPELIN_PYTHON); - logger.info("File {} created", scriptPath); + // copy zeppelin_context.py as well + File zOut = new File(out.getParent() + "/zeppelin_context.py"); + copyFile(zOut, ZEPPELIN_CONTEXT); + + logger.info("File {} , {} created", scriptPath, zOut.getAbsolutePath()); } public String getScriptPath() { @@ -181,7 +186,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl cmd.addArgument(getLocalIp(), false); executor = new DefaultExecutor(); - outputStream = new InterpreterOutputStream(logger); + outputStream = new InterpreterOutputStream(LOG); PipedOutputStream ps = new PipedOutputStream(); in = null; try { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/python/src/main/resources/grpc/python/zeppelin_python.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/grpc/python/zeppelin_python.py b/python/src/main/resources/grpc/python/zeppelin_python.py index d76bdf4..22fc2a0 100644 --- a/python/src/main/resources/grpc/python/zeppelin_python.py +++ b/python/src/main/resources/grpc/python/zeppelin_python.py @@ -17,130 +17,8 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient -from io import BytesIO -try: - from StringIO import StringIO -except ImportError: - from io import StringIO - -class PyZeppelinContext(object): - """ A context impl that uses Py4j to communicate to JVM - """ - - def __init__(self, z): - self.z = z - self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption - self.javaList = gateway.jvm.java.util.ArrayList - self.max_result = z.getMaxResult() - - def getInterpreterContext(self): - return self.z.getInterpreterContext() - - def input(self, name, defaultValue=""): - return self.z.input(name, defaultValue) - - def textbox(self, name, defaultValue=""): - return self.z.textbox(name, defaultValue) - - def noteTextbox(self, name, defaultValue=""): - return self.z.noteTextbox(name, defaultValue) - - def select(self, name, options, defaultValue=""): - return self.z.select(name, defaultValue, self.getParamOptions(options)) - - def noteSelect(self, name, options, defaultValue=""): - return self.z.noteSelect(name, defaultValue, self.getParamOptions(options)) - - def checkbox(self, name, options, defaultChecked=[]): - return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options)) - - def noteCheckbox(self, name, options, defaultChecked=[]): - return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options)) - - def getParamOptions(self, options): - javaOptions = gateway.new_array(self.paramOption, len(options)) - i = 0 - for tuple in options: - javaOptions[i] = self.paramOption(tuple[0], tuple[1]) - i += 1 - return javaOptions - - def getDefaultChecked(self, defaultChecked): - javaDefaultChecked = self.javaList() - for check in defaultChecked: - javaDefaultChecked.append(check) - return javaDefaultChecked - - def show(self, p, **kwargs): - if type(p).__name__ == "DataFrame": # does not play well with sub-classes - # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` - # and so a dependency on pandas - self.show_dataframe(p, **kwargs) - elif hasattr(p, '__call__'): - p() #error reporting - - def show_dataframe(self, df, show_index=False, **kwargs): - """Pretty prints DF using Table Display System - """ - limit = len(df) > self.max_result - header_buf = StringIO("") - if show_index: - idx_name = str(df.index.name) if df.index.name is not None else "" - header_buf.write(idx_name + "\t") - header_buf.write(str(df.columns[0])) - for col in df.columns[1:]: - header_buf.write("\t") - header_buf.write(str(col)) - header_buf.write("\n") - - body_buf = StringIO("") - rows = df.head(self.max_result).values if limit else df.values - index = df.index.values - for idx, row in zip(index, rows): - if show_index: - body_buf.write("%html <strong>{}</strong>".format(idx)) - body_buf.write("\t") - body_buf.write(str(row[0])) - for cell in row[1:]: - body_buf.write("\t") - body_buf.write(str(cell)) - body_buf.write("\n") - body_buf.seek(0); header_buf.seek(0) - #TODO(bzz): fix it, so it shows red notice, as in Spark - print("%table " + header_buf.read() + body_buf.read()) # + - # ("\n<font color=red>Results are limited by {}.</font>" \ - # .format(self.max_result) if limit else "") - #) - body_buf.close(); header_buf.close() - - def registerHook(self, event, cmd, replName=None): - if replName is None: - self.z.registerHook(event, cmd) - else: - self.z.registerHook(event, cmd, replName) - - def unregisterHook(self, event, replName=None): - if replName is None: - self.z.unregisterHook(event) - else: - self.z.unregisterHook(event, replName) - - def registerNoteHook(self, event, cmd, noteId, replName=None): - if replName is None: - self.z.registerNoteHook(event, cmd, noteId) - else: - self.z.registerNoteHook(event, cmd, noteId, replName) - - def unregisterNoteHook(self, event, noteId, replName=None): - if replName is None: - self.z.unregisterNoteHook(event, noteId) - else: - self.z.unregisterNoteHook(event, noteId, replName) - # start JVM gateway client = GatewayClient(address='127.0.0.1', port=${JVM_GATEWAY_PORT}) gateway = JavaGateway(client) java_import(gateway.jvm, "org.apache.zeppelin.display.Input") intp = gateway.entry_point -z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext()) - http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/python/src/main/resources/python/zeppelin_context.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/zeppelin_context.py b/python/src/main/resources/python/zeppelin_context.py new file mode 100644 index 0000000..d97a789 --- /dev/null +++ b/python/src/main/resources/python/zeppelin_context.py @@ -0,0 +1,224 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os, sys +import warnings + +from io import BytesIO + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +class PyZeppelinContext(object): + """ A context impl that uses Py4j to communicate to JVM + """ + + def __init__(self, z, gateway): + self.z = z + self.gateway = gateway + self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption + self.javaList = gateway.jvm.java.util.ArrayList + self.max_result = 1000 + self._displayhook = lambda *args: None + self._setup_matplotlib() + + # By implementing special methods it makes operating on it more Pythonic + def __setitem__(self, key, item): + self.z.put(key, item) + + def __getitem__(self, key): + return self.z.get(key) + + def __delitem__(self, key): + self.z.remove(key) + + def __contains__(self, item): + return self.z.containsKey(item) + + def add(self, key, value): + self.__setitem__(key, value) + + def put(self, key, value): + self.__setitem__(key, value) + + def get(self, key): + return self.__getitem__(key) + + def getInterpreterContext(self): + return self.z.getInterpreterContext() + + def input(self, name, defaultValue=""): + return self.z.input(name, defaultValue) + + def textbox(self, name, defaultValue=""): + return self.z.textbox(name, defaultValue) + + def noteTextbox(self, name, defaultValue=""): + return self.z.noteTextbox(name, defaultValue) + + def select(self, name, options, defaultValue=""): + return self.z.select(name, defaultValue, self.getParamOptions(options)) + + def noteSelect(self, name, options, defaultValue=""): + return self.z.noteSelect(name, defaultValue, self.getParamOptions(options)) + + def checkbox(self, name, options, defaultChecked=[]): + return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options)) + + def noteCheckbox(self, name, options, defaultChecked=[]): + return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options)) + + def registerHook(self, event, cmd, replName=None): + if replName is None: + self.z.registerHook(event, cmd) + else: + self.z.registerHook(event, cmd, replName) + + def unregisterHook(self, event, replName=None): + if replName is None: + self.z.unregisterHook(event) + else: + self.z.unregisterHook(event, replName) + + def registerNoteHook(self, event, cmd, noteId, replName=None): + if replName is None: + self.z.registerNoteHook(event, cmd, noteId) + else: + self.z.registerNoteHook(event, cmd, noteId, replName) + + def unregisterNoteHook(self, event, noteId, replName=None): + if replName is None: + self.z.unregisterNoteHook(event, noteId) + else: + self.z.unregisterNoteHook(event, noteId, replName) + + def getParamOptions(self, options): + javaOptions = self.gateway.new_array(self.paramOption, len(options)) + i = 0 + for tuple in options: + javaOptions[i] = self.paramOption(tuple[0], tuple[1]) + i += 1 + return javaOptions + + def getDefaultChecked(self, defaultChecked): + javaDefaultChecked = self.javaList() + for check in defaultChecked: + javaDefaultChecked.append(check) + return javaDefaultChecked + + def show(self, p, **kwargs): + if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot": + self.show_matplotlib(p, **kwargs) + elif type(p).__name__ == "DataFrame": # does not play well with sub-classes + # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` + # and so a dependency on pandas + self.show_dataframe(p, **kwargs) + elif hasattr(p, '__call__'): + p() #error reporting + + def show_dataframe(self, df, show_index=False, **kwargs): + """Pretty prints DF using Table Display System + """ + limit = len(df) > self.max_result + header_buf = StringIO("") + if show_index: + idx_name = str(df.index.name) if df.index.name is not None else "" + header_buf.write(idx_name + "\t") + header_buf.write(str(df.columns[0])) + for col in df.columns[1:]: + header_buf.write("\t") + header_buf.write(str(col)) + header_buf.write("\n") + + body_buf = StringIO("") + rows = df.head(self.max_result).values if limit else df.values + index = df.index.values + for idx, row in zip(index, rows): + if show_index: + body_buf.write("%html <strong>{}</strong>".format(idx)) + body_buf.write("\t") + body_buf.write(str(row[0])) + for cell in row[1:]: + body_buf.write("\t") + body_buf.write(str(cell)) + body_buf.write("\n") + body_buf.seek(0); header_buf.seek(0) + #TODO(bzz): fix it, so it shows red notice, as in Spark + print("%table " + header_buf.read() + body_buf.read()) # + + # ("\n<font color=red>Results are limited by {}.</font>" \ + # .format(self.max_result) if limit else "") + #) + body_buf.close(); header_buf.close() + + def show_matplotlib(self, p, fmt="png", width="auto", height="auto", + **kwargs): + """Matplotlib show function + """ + if fmt == "png": + img = BytesIO() + p.savefig(img, format=fmt) + img_str = b"data:image/png;base64," + img_str += base64.b64encode(img.getvalue().strip()) + img_tag = "<img src={img} style='width={width};height:{height}'>" + # Decoding is necessary for Python 3 compability + img_str = img_str.decode("ascii") + img_str = img_tag.format(img=img_str, width=width, height=height) + elif fmt == "svg": + img = StringIO() + p.savefig(img, format=fmt) + img_str = img.getvalue() + else: + raise ValueError("fmt must be 'png' or 'svg'") + + html = "%html <div style='width:{width};height:{height}'>{img}<div>" + print(html.format(width=width, height=height, img=img_str)) + img.close() + + def configure_mpl(self, **kwargs): + import mpl_config + mpl_config.configure(**kwargs) + + def _setup_matplotlib(self): + # If we don't have matplotlib installed don't bother continuing + try: + import matplotlib + except ImportError: + return + + # Make sure custom backends are available in the PYTHONPATH + rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd()) + mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python') + if mpl_path not in sys.path: + sys.path.append(mpl_path) + + # Finally check if backend exists, and if so configure as appropriate + try: + matplotlib.use('module://backend_zinline') + import backend_zinline + + # Everything looks good so make config assuming that we are using + # an inline backend + self._displayhook = backend_zinline.displayhook + self.configure_mpl(width=600, height=400, dpi=72, fontsize=10, + interactive=True, format='png', context=self.z) + except ImportError: + # Fall back to Agg if no custom backend installed + matplotlib.use('Agg') + warnings.warn("Unable to load inline matplotlib backend, " + "falling back to Agg") http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/python/src/main/resources/python/zeppelin_python.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py index cc4cb79..0b2d533 100644 --- a/python/src/main/resources/python/zeppelin_python.py +++ b/python/src/main/resources/python/zeppelin_python.py @@ -47,182 +47,6 @@ class Logger(object): def flush(self): pass - -class PyZeppelinContext(object): - """ A context impl that uses Py4j to communicate to JVM - """ - - def __init__(self, z): - self.z = z - self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption - self.javaList = gateway.jvm.java.util.ArrayList - self.max_result = 1000 - self._displayhook = lambda *args: None - self._setup_matplotlib() - - def getInterpreterContext(self): - return self.z.getInterpreterContext() - - def input(self, name, defaultValue=""): - return self.z.input(name, defaultValue) - - def textbox(self, name, defaultValue=""): - return self.z.textbox(name, defaultValue) - - def noteTextbox(self, name, defaultValue=""): - return self.z.noteTextbox(name, defaultValue) - - def select(self, name, options, defaultValue=""): - return self.z.select(name, defaultValue, self.getParamOptions(options)) - - def noteSelect(self, name, options, defaultValue=""): - return self.z.noteSelect(name, defaultValue, self.getParamOptions(options)) - - def checkbox(self, name, options, defaultChecked=[]): - return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options)) - - def noteCheckbox(self, name, options, defaultChecked=[]): - return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options)) - - def registerHook(self, event, cmd, replName=None): - if replName is None: - self.z.registerHook(event, cmd) - else: - self.z.registerHook(event, cmd, replName) - - def unregisterHook(self, event, replName=None): - if replName is None: - self.z.unregisterHook(event) - else: - self.z.unregisterHook(event, replName) - - def registerNoteHook(self, event, cmd, noteId, replName=None): - if replName is None: - self.z.registerNoteHook(event, cmd, noteId) - else: - self.z.registerNoteHook(event, cmd, noteId, replName) - - def unregisterNoteHook(self, event, noteId, replName=None): - if replName is None: - self.z.unregisterNoteHook(event, noteId) - else: - self.z.unregisterNoteHook(event, noteId, replName) - - def getParamOptions(self, options): - javaOptions = gateway.new_array(self.paramOption, len(options)) - i = 0 - for tuple in options: - javaOptions[i] = self.paramOption(tuple[0], tuple[1]) - i += 1 - return javaOptions - - def getDefaultChecked(self, defaultChecked): - javaDefaultChecked = self.javaList() - for check in defaultChecked: - javaDefaultChecked.append(check) - return javaDefaultChecked - - def show(self, p, **kwargs): - if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot": - self.show_matplotlib(p, **kwargs) - elif type(p).__name__ == "DataFrame": # does not play well with sub-classes - # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` - # and so a dependency on pandas - self.show_dataframe(p, **kwargs) - elif hasattr(p, '__call__'): - p() #error reporting - - def show_dataframe(self, df, show_index=False, **kwargs): - """Pretty prints DF using Table Display System - """ - limit = len(df) > self.max_result - header_buf = StringIO("") - if show_index: - idx_name = str(df.index.name) if df.index.name is not None else "" - header_buf.write(idx_name + "\t") - header_buf.write(str(df.columns[0])) - for col in df.columns[1:]: - header_buf.write("\t") - header_buf.write(str(col)) - header_buf.write("\n") - - body_buf = StringIO("") - rows = df.head(self.max_result).values if limit else df.values - index = df.index.values - for idx, row in zip(index, rows): - if show_index: - body_buf.write("%html <strong>{}</strong>".format(idx)) - body_buf.write("\t") - body_buf.write(str(row[0])) - for cell in row[1:]: - body_buf.write("\t") - body_buf.write(str(cell)) - body_buf.write("\n") - body_buf.seek(0); header_buf.seek(0) - #TODO(bzz): fix it, so it shows red notice, as in Spark - print("%table " + header_buf.read() + body_buf.read()) # + - # ("\n<font color=red>Results are limited by {}.</font>" \ - # .format(self.max_result) if limit else "") - #) - body_buf.close(); header_buf.close() - - def show_matplotlib(self, p, fmt="png", width="auto", height="auto", - **kwargs): - """Matplotlib show function - """ - if fmt == "png": - img = BytesIO() - p.savefig(img, format=fmt) - img_str = b"data:image/png;base64," - img_str += base64.b64encode(img.getvalue().strip()) - img_tag = "<img src={img} style='width={width};height:{height}'>" - # Decoding is necessary for Python 3 compability - img_str = img_str.decode("ascii") - img_str = img_tag.format(img=img_str, width=width, height=height) - elif fmt == "svg": - img = StringIO() - p.savefig(img, format=fmt) - img_str = img.getvalue() - else: - raise ValueError("fmt must be 'png' or 'svg'") - - html = "%html <div style='width:{width};height:{height}'>{img}<div>" - print(html.format(width=width, height=height, img=img_str)) - img.close() - - def configure_mpl(self, **kwargs): - import mpl_config - mpl_config.configure(**kwargs) - - def _setup_matplotlib(self): - # If we don't have matplotlib installed don't bother continuing - try: - import matplotlib - except ImportError: - return - # Make sure custom backends are available in the PYTHONPATH - rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd()) - mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python') - if mpl_path not in sys.path: - sys.path.append(mpl_path) - - # Finally check if backend exists, and if so configure as appropriate - try: - matplotlib.use('module://backend_zinline') - import backend_zinline - - # Everything looks good so make config assuming that we are using - # an inline backend - self._displayhook = backend_zinline.displayhook - self.configure_mpl(width=600, height=400, dpi=72, - fontsize=10, interactive=True, format='png') - except ImportError: - # Fall back to Agg if no custom backend installed - matplotlib.use('Agg') - warnings.warn("Unable to load inline matplotlib backend, " - "falling back to Agg") - - def handler_stop_signals(sig, frame): sys.exit("Got signal : " + str(sig)) @@ -236,14 +60,16 @@ if len(sys.argv) >= 3: _zcUserQueryNameSpace = {} client = GatewayClient(address=host, port=int(sys.argv[1])) -#gateway = JavaGateway(client, auto_convert = True) gateway = JavaGateway(client) intp = gateway.entry_point intp.onPythonScriptInitialized(os.getpid()) java_import(gateway.jvm, "org.apache.zeppelin.display.Input") -z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext()) + +from zeppelin_context import PyZeppelinContext + +z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway) __zeppelin__._setup_matplotlib() _zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index 2d5d832..75f1c06 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -78,7 +78,7 @@ public class IPythonInterpreterTest { @Test public void testGrpcFrameSize() throws InterpreterException, IOException { Properties properties = new Properties(); - properties.setProperty("zeppelin.ipython.grpc.message_size", "4"); + properties.setProperty("zeppelin.ipython.grpc.message_size", "200"); startInterpreter(properties); // to make this test can run under both python2 and python3 @@ -86,11 +86,11 @@ public class IPythonInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); InterpreterContext context = getInterpreterContext(); - result = interpreter.interpret("print(11111111111111111111111111111)", context); + result = interpreter.interpret("print('1'*300)", context); assertEquals(InterpreterResult.Code.ERROR, result.code()); List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage(); assertEquals(1, interpreterResultMessages.size()); - assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 32 exceeds maximum: 4")); + assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 304 exceeds maximum: 200")); // next call continue work result = interpreter.interpret("print(1)", context); @@ -99,14 +99,14 @@ public class IPythonInterpreterTest { close(); // increase framesize to make it work - properties.setProperty("zeppelin.ipython.grpc.message_size", "40"); + properties.setProperty("zeppelin.ipython.grpc.message_size", "500"); startInterpreter(properties); // to make this test can run under both python2 and python3 result = interpreter.interpret("from __future__ import print_function", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); context = getInterpreterContext(); - result = interpreter.interpret("print(11111111111111111111111111111)", context); + result = interpreter.interpret("print('1'*300)", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/python/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/python/src/test/resources/log4j.properties b/python/src/test/resources/log4j.properties index a8e2c44..035c7a3 100644 --- a/python/src/test/resources/log4j.properties +++ b/python/src/test/resources/log4j.properties @@ -29,3 +29,4 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n log4j.rootLogger=INFO, stdout log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG +log4j.logger.org.apache.zeppelin.python=DEBUG \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index a838667..d97bb51 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -114,6 +114,16 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand throw new InterpreterException(e); } + try { + FileOutputStream outStream = new FileOutputStream(out.getParent() + "/zeppelin_context.py"); + IOUtils.copy( + classLoader.getResourceAsStream("python/zeppelin_context.py"), + outStream); + outStream.close(); + } catch (IOException e) { + throw new InterpreterException(e); + } + LOGGER.info("File {} created", scriptPath); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py index 5723f45..ad9171a 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py @@ -54,8 +54,8 @@ else: class IPySparkZeppelinContext(PyZeppelinContext): - def __init__(self, z): - super(IPySparkZeppelinContext, self).__init__(z) + def __init__(self, z, gateway): + super(IPySparkZeppelinContext, self).__init__(z, gateway) def show(self, obj): from pyspark.sql import DataFrame @@ -64,4 +64,4 @@ class IPySparkZeppelinContext(PyZeppelinContext): else: super(IPySparkZeppelinContext, self).show(obj) -z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext()) +z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext(), gateway) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py index 00d8a9a..614c516 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py @@ -41,155 +41,6 @@ class Logger(object): pass -class PyZeppelinContext(dict): - def __init__(self, zc): - self.z = zc - self._displayhook = lambda *args: None - - def show(self, obj): - from pyspark.sql import DataFrame - if isinstance(obj, DataFrame): - print(self.z.showData(obj._jdf)) - else: - print(str(obj)) - - # By implementing special methods it makes operating on it more Pythonic - def __setitem__(self, key, item): - self.z.put(key, item) - - def __getitem__(self, key): - return self.z.get(key) - - def __delitem__(self, key): - self.z.remove(key) - - def __contains__(self, item): - return self.z.containsKey(item) - - def add(self, key, value): - self.__setitem__(key, value) - - def put(self, key, value): - self.__setitem__(key, value) - - def get(self, key): - return self.__getitem__(key) - - def getInterpreterContext(self): - return self.z.getInterpreterContext() - - def input(self, name, defaultValue=""): - return self.z.input(name, defaultValue) - - def textbox(self, name, defaultValue=""): - return self.z.textbox(name, defaultValue) - - def noteTextbox(self, name, defaultValue=""): - return self.z.noteTextbox(name, defaultValue) - - def select(self, name, options, defaultValue=""): - # auto_convert to ArrayList doesn't match the method signature on JVM side - return self.z.select(name, defaultValue, self.getParamOptions(options)) - - def noteSelect(self, name, options, defaultValue=""): - return self.z.noteSelect(name, defaultValue, self.getParamOptions(options)) - - def checkbox(self, name, options, defaultChecked=None): - optionsIterable = self.getParamOptions(options) - defaultCheckedIterables = self.getDefaultChecked(defaultChecked) - checkedItems = gateway.jvm.scala.collection.JavaConversions.seqAsJavaList(self.z.checkbox(name, defaultCheckedIterables, optionsIterable)) - result = [] - for checkedItem in checkedItems: - result.append(checkedItem) - return result; - - def noteCheckbox(self, name, options, defaultChecked=None): - optionsIterable = self.getParamOptions(options) - defaultCheckedIterables = self.getDefaultChecked(defaultChecked) - checkedItems = gateway.jvm.scala.collection.JavaConversions.seqAsJavaList(self.z.noteCheckbox(name, defaultCheckedIterables, optionsIterable)) - result = [] - for checkedItem in checkedItems: - result.append(checkedItem) - return result; - - def getParamOptions(self, options): - tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options)) - return gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples) - - def getDefaultChecked(self, defaultChecked): - if defaultChecked is None: - defaultChecked = [] - return gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(defaultChecked) - - def registerHook(self, event, cmd, replName=None): - if replName is None: - self.z.registerHook(event, cmd) - else: - self.z.registerHook(event, cmd, replName) - - def unregisterHook(self, event, replName=None): - if replName is None: - self.z.unregisterHook(event) - else: - self.z.unregisterHook(event, replName) - - def registerNoteHook(self, event, cmd, noteId, replName=None): - if replName is None: - self.z.registerNoteHook(event, cmd, noteId) - else: - self.z.registerNoteHook(event, cmd, noteId, replName) - - def unregisterNoteHook(self, event, noteId, replName=None): - if replName is None: - self.z.unregisterNoteHook(event, noteId) - else: - self.z.unregisterNoteHook(event, noteId, replName) - - def getHook(self, event, replName=None): - if replName is None: - return self.z.getHook(event) - return self.z.getHook(event, replName) - - def _setup_matplotlib(self): - # If we don't have matplotlib installed don't bother continuing - try: - import matplotlib - except ImportError: - return - - # Make sure custom backends are available in the PYTHONPATH - rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd()) - mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python') - if mpl_path not in sys.path: - sys.path.append(mpl_path) - - # Finally check if backend exists, and if so configure as appropriate - try: - matplotlib.use('module://backend_zinline') - import backend_zinline - - # Everything looks good so make config assuming that we are using - # an inline backend - self._displayhook = backend_zinline.displayhook - self.configure_mpl(width=600, height=400, dpi=72, fontsize=10, - interactive=True, format='png', context=self.z) - except ImportError: - # Fall back to Agg if no custom backend installed - matplotlib.use('Agg') - warnings.warn("Unable to load inline matplotlib backend, " - "falling back to Agg") - - def configure_mpl(self, **kwargs): - import mpl_config - mpl_config.configure(**kwargs) - - def __tupleToScalaTuple2(self, tuple): - if (len(tuple) == 2): - return gateway.jvm.scala.Tuple2(tuple[0], tuple[1]) - else: - raise IndexError("options must be a list of tuple of 2") - - class SparkVersion(object): SPARK_1_4_0 = 10400 SPARK_1_3_0 = 10300 @@ -322,7 +173,24 @@ completion = __zeppelin_completion__ = PySparkCompletion(intp) _zcUserQueryNameSpace["completion"] = completion _zcUserQueryNameSpace["__zeppelin_completion__"] = __zeppelin_completion__ -z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext()) + +from zeppelin_context import PyZeppelinContext + +#TODO(zjffdu) merge it with IPySparkZeppelinContext +class PySparkZeppelinContext(PyZeppelinContext): + + def __init__(self, z, gateway): + super(PySparkZeppelinContext, self).__init__(z, gateway) + + def show(self, obj): + from pyspark.sql import DataFrame + if isinstance(obj, DataFrame): + print(self.z.showData(obj._jdf)) + else: + super(PySparkZeppelinContext, self).show(obj) + +z = __zeppelin__ = PySparkZeppelinContext(intp.getZeppelinContext(), gateway) + __zeppelin__._setup_matplotlib() _zcUserQueryNameSpace["z"] = z _zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f88452d/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java index d01e728..b2f300a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java @@ -60,8 +60,7 @@ public class PluginManager { (Class.forName(notebookRepoClassName).newInstance()); return notebookRepo; } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - LOGGER.warn("Fail to instantiate notebookrepo:" + notebookRepoClassName, e); - return null; + LOGGER.warn("Fail to instantiate notebookrepo from classpath directly:" + notebookRepoClassName, e); } }