ZEPPELIN-3375: Make PySparkInterpreter extends PythonInterpreter ### What is this PR for? This PR is trying to remove the code duplication between PySparkInterpreter and PythonInterpreter. So here's the main things this PR did: * PySparkInterpreter extends PythonInterpreter * PySparkInterpreterTest extends PythonInterpreterTest so that we can verify PySparkInterpreter can do whatever PythonInterpreter can do * Move interpreter/lib/python/backend_zinline.py and interpreter/lib/python/mpl_config.py into python module, so that python module can ship these resources together.
### What type of PR is it? [ Improvement | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3375 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2919 from zjffdu/ZEPPELIN-3375 and squashes the following commits: 738c6c5 [Jeff Zhang] ZEPPELIN-3375. Make PySparkInterpreter extends PythonInterpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/0a97446a Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/0a97446a Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/0a97446a Branch: refs/heads/master Commit: 0a97446a70f6294a3efb071bb9a70601f885840b Parents: 7aa94ce Author: Jeff Zhang <zjf...@apache.org> Authored: Sun Apr 1 21:26:28 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Mon Apr 16 14:17:54 2018 +0800 ---------------------------------------------------------------------- interpreter/lib/python/backend_zinline.py | 317 -------- interpreter/lib/python/mpl_config.py | 105 --- python/pom.xml | 25 - .../zeppelin/python/IPythonInterpreter.java | 25 +- .../zeppelin/python/PythonCondaInterpreter.java | 77 +- .../python/PythonDockerInterpreter.java | 19 +- .../zeppelin/python/PythonInterpreter.java | 760 ++++++++++--------- .../python/PythonInterpreterPandasSql.java | 2 +- .../main/resources/python/backend_zinline.py | 317 ++++++++ python/src/main/resources/python/mpl_config.py | 105 +++ .../main/resources/python/py4j-src-0.9.2.zip | Bin 0 -> 71355 bytes .../main/resources/python/zeppelin_python.py | 159 ++-- .../python/BasePythonInterpreterTest.java | 331 ++++++++ .../zeppelin/python/IPythonInterpreterTest.java | 383 ++-------- .../python/PythonCondaInterpreterTest.java | 6 +- .../python/PythonDockerInterpreterTest.java | 23 +- .../zeppelin/python/PythonInterpreterTest.java | 165 ++-- python/src/test/resources/log4j.properties | 17 +- spark/interpreter/pom.xml | 4 +- .../zeppelin/spark/IPySparkInterpreter.java | 16 +- .../zeppelin/spark/NewSparkInterpreter.java | 7 +- .../zeppelin/spark/PySparkInterpreter.java | 581 ++------------ .../main/resources/python/zeppelin_pyspark.py | 215 +----- .../zeppelin/spark/IPySparkInterpreterTest.java | 87 ++- .../zeppelin/spark/OldSparkInterpreterTest.java | 4 +- .../zeppelin/spark/PySparkInterpreterTest.java | 169 ++--- .../zeppelin/spark/SparkRInterpreterTest.java | 16 +- .../src/test/resources/log4j.properties | 6 +- .../zeppelin/interpreter/InterpreterGroup.java | 13 + .../zeppelin/rest/ZeppelinSparkClusterTest.java | 17 +- .../interpreter/InterpreterSetting.java | 3 +- 31 files changed, 1678 insertions(+), 2296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/interpreter/lib/python/backend_zinline.py ---------------------------------------------------------------------- diff --git a/interpreter/lib/python/backend_zinline.py b/interpreter/lib/python/backend_zinline.py deleted file mode 100644 index 1c84699..0000000 --- a/interpreter/lib/python/backend_zinline.py +++ /dev/null @@ -1,317 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This file provides a static (non-interactive) matplotlib plotting backend -# for zeppelin notebooks for use with the python/pyspark interpreters - -from __future__ import print_function - -import sys -import uuid -import warnings -import base64 -from io import BytesIO -try: - from StringIO import StringIO -except ImportError: - from io import StringIO - -import mpl_config -import matplotlib -from matplotlib._pylab_helpers import Gcf -from matplotlib.backends.backend_agg import new_figure_manager, FigureCanvasAgg -from matplotlib.backend_bases import ShowBase, FigureManagerBase -from matplotlib.figure import Figure - -######################################################################## -# -# The following functions and classes are for pylab and implement -# window/figure managers, etc... -# -######################################################################## - -class Show(ShowBase): - """ - A callable object that displays the figures to the screen. Valid kwargs - include figure width and height (in units supported by the div tag), block - (allows users to override blocking behavior regardless of whether or not - interactive mode is enabled, currently unused) and close (Implicitly call - matplotlib.pyplot.close('all') with each call to show()). - """ - def __call__(self, close=None, block=None, **kwargs): - if close is None: - close = mpl_config.get('close') - try: - managers = Gcf.get_all_fig_managers() - if not managers: - return - - # Tell zeppelin that the output will be html using the %html magic - # We want to do this only once to avoid seeing "%html" printed - # directly to the outout when multiple figures are displayed from - # one paragraph. - if mpl_config.get('angular'): - print('%angular') - else: - print('%html') - - # Show all open figures - for manager in managers: - manager.show(**kwargs) - finally: - # This closes all the figures if close is set to True. - if close and Gcf.get_all_fig_managers(): - Gcf.destroy_all() - - -class FigureCanvasZInline(FigureCanvasAgg): - """ - The canvas the figure renders into. Calls the draw and print fig - methods, creates the renderers, etc... - """ - def get_bytes(self, **kwargs): - """ - Get the byte representation of the figure. - Should only be used with jpg/png formats. - """ - # Make sure format is correct - fmt = kwargs.get('format', mpl_config.get('format')) - if fmt == 'svg': - raise ValueError("get_bytes() does not support svg, use png or jpg") - - # Express the image as bytes - buf = BytesIO() - self.print_figure(buf, **kwargs) - fmt = fmt.encode() - if sys.version_info >= (3, 4) and sys.version_info < (3, 5): - byte_str = bytes("data:image/%s;base64," %fmt, "utf-8") - else: - byte_str = b"data:image/%s;base64," %fmt - byte_str += base64.b64encode(buf.getvalue()) - - # Python3 forces all strings to default to unicode, but for raster image - # formats (eg png, jpg), we want to work with bytes. Thus this step is - # needed to ensure compatability for all python versions. - byte_str = byte_str.decode('ascii') - buf.close() - return byte_str - - def get_svg(self, **kwargs): - """ - Get the svg representation of the figure. - Should only be used with svg format. - """ - # Make sure format is correct - fmt = kwargs.get('format', mpl_config.get('format')) - if fmt != 'svg': - raise ValueError("get_svg() does not support png or jpg, use svg") - - # For SVG the data string has to be unicode, not bytes - buf = StringIO() - self.print_figure(buf, **kwargs) - svg_str = buf.getvalue() - buf.close() - return svg_str - - def draw_idle(self, *args, **kwargs): - """ - Called when the figure gets updated (eg through a plotting command). - This is overriden to allow open figures to be reshown after they - are updated when mpl_config.get('close') is False. - """ - if not self._is_idle_drawing: - with self._idle_draw_cntx(): - self.draw(*args, **kwargs) - draw_if_interactive() - - -class FigureManagerZInline(FigureManagerBase): - """ - Wrap everything up into a window for the pylab interface - """ - def __init__(self, canvas, num): - FigureManagerBase.__init__(self, canvas, num) - self.fig_id = "figure_{0}".format(uuid.uuid4().hex) - self._shown = False - - def angular_bind(self, **kwargs): - """ - Bind figure data to Zeppelin's Angular Object Registry. - If mpl_config("angular") is True and PY4J is supported, this allows - for the possibility to interactively update a figure from a separate - paragraph without having to display it multiple times. - """ - # This doesn't work for SVG so make sure it's not our format - fmt = kwargs.get('format', mpl_config.get('format')) - if fmt == 'svg': - return - - # Get the figure data as a byte array - src = self.canvas.get_bytes(**kwargs) - - # Flag to determine whether or not to use - # zeppelin's angular display system - angular = mpl_config.get('angular') - - # ZeppelinContext instance (requires PY4J) - context = mpl_config.get('context') - - # Finally we must ensure that automatic closing is set to False, - # as otherwise using the angular display system is pointless - close = mpl_config.get('close') - - # If above conditions are met, bind the figure data to - # the Angular Object Registry. - if not close and angular: - if hasattr(context, 'angularBind'): - # Binding is performed through figure ID to ensure this works - # if multiple figures are open - context.angularBind(self.fig_id, src) - - # Zeppelin will automatically replace this value even if it - # is updated from another pargraph thanks to the {{}} notation - src = "{{%s}}" %self.fig_id - else: - warnings.warn("Cannot bind figure to Angular Object Registry. " - "Check if PY4J is installed.") - return src - - def angular_unbind(self): - """ - Unbind figure from angular display system. - """ - context = mpl_config.get('context') - if hasattr(context, 'angularUnbind'): - context.angularUnbind(self.fig_id) - - def destroy(self): - """ - Called when close=True or implicitly by pyplot.close(). - Overriden to automatically clean up the angular object registry. - """ - self.angular_unbind() - - def show(self, **kwargs): - if not self._shown: - zdisplay(self.canvas.figure, **kwargs) - else: - self.canvas.draw_idle() - self.angular_bind(**kwargs) - - self._shown = True - - -def draw_if_interactive(): - """ - If interactive mode is on, this allows for updating properties of - the figure when each new plotting command is called. - """ - manager = Gcf.get_active() - interactive = matplotlib.is_interactive() - angular = mpl_config.get('angular') - - # Don't bother continuing if we aren't in interactive mode - # or if there are no active figures. Also pointless to continue - # in angular mode as we don't want to reshow the figure. - if not interactive or angular or manager is None: - return - - # Allow for figure to be reshown if close is false since - # this function call implies that it has been updated - if not mpl_config.get('close'): - manager._shown = False - - -def new_figure_manager(num, *args, **kwargs): - """ - Create a new figure manager instance - """ - # if a main-level app must be created, this (and - # new_figure_manager_given_figure) is the usual place to - # do it -- see backend_wx, backend_wxagg and backend_tkagg for - # examples. Not all GUIs require explicit instantiation of a - # main-level app (egg backend_gtk, backend_gtkagg) for pylab - FigureClass = kwargs.pop('FigureClass', Figure) - thisFig = FigureClass(*args, **kwargs) - return new_figure_manager_given_figure(num, thisFig) - - -def new_figure_manager_given_figure(num, figure): - """ - Create a new figure manager instance for the given figure. - """ - canvas = FigureCanvasZInline(figure) - manager = FigureManagerZInline(canvas, num) - return manager - - -######################################################################## -# -# Backend specific functions -# -######################################################################## - -def zdisplay(fig, **kwargs): - """ - Publishes a matplotlib figure to the notebook paragraph output. - """ - # kwargs can be width or height (in units supported by div tag) - width = kwargs.pop('width', 'auto') - height = kwargs.pop('height', 'auto') - fmt = kwargs.get('format', mpl_config.get('format')) - - # Check if format is supported - supported_formats = mpl_config.get('supported_formats') - if fmt not in supported_formats: - raise ValueError("Unsupported format %s" %fmt) - - # For SVG the data string has to be unicode, not bytes - if fmt == 'svg': - img = fig.canvas.get_svg(**kwargs) - - # This is needed to ensure the SVG image is the correct size. - # We should find a better way to do this... - width = '{}px'.format(mpl_config.get('width')) - height = '{}px'.format(mpl_config.get('height')) - else: - # Express the image as bytes - src = fig.canvas.manager.angular_bind(**kwargs) - img = "<img src={src} style='width={width};height:{height}'>" - img = img.format(src=src, width=width, height=height) - - # Print the image to the notebook paragraph via the %html magic - html = "<div style='width:{width};height:{height}'>{img}<div>" - print(html.format(width=width, height=height, img=img)) - -def displayhook(): - """ - Called post paragraph execution if interactive mode is on - """ - if matplotlib.is_interactive(): - show() - -######################################################################## -# -# Now just provide the standard names that backend.__init__ is expecting -# -######################################################################## - -# Create a reference to the show function we are using. This is what actually -# gets called by matplotlib.pyplot.show(). -show = Show() - -# Default FigureCanvas and FigureManager classes to use from the backend -FigureCanvas = FigureCanvasZInline -FigureManager = FigureManagerZInline http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/interpreter/lib/python/mpl_config.py ---------------------------------------------------------------------- diff --git a/interpreter/lib/python/mpl_config.py b/interpreter/lib/python/mpl_config.py deleted file mode 100644 index 5c60893..0000000 --- a/interpreter/lib/python/mpl_config.py +++ /dev/null @@ -1,105 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This module provides utitlites for users to configure the inline plotting -# backend through a PyZeppelinContext instance (eg, through z.configure_mpl()) - -import matplotlib - -def configure(**kwargs): - """ - Generic configure function. - Usage: configure(prop1='foo', prop2='bar', ...) - Currently supported zeppelin-specific properties are: - interactive - If true show all figures without explicit call to show() - via a post-execute hook. - angular - If true, bind figures to angular display system. - close - If true, close all figures once shown. - width, height - Default width / height of the figure in pixels. - fontsize - Font size. - dpi - dpi of the figure. - fmt - Figure format - supported_formats - Supported Figure formats () - context - ZeppelinContext instance (requires PY4J) - """ - _config.update(**kwargs) - - # Broadcast relevant changes to matplotlib RC - _on_config_change() - - -def get(key): - """ - Get the configuration info given a key - """ - return _config[key] - - -def _on_config_change(): - # dpi - dpi = _config['dpi'] - - # For older versions of matplotlib, savefig.dpi is not synced with - # figure.dpi by default - matplotlib.rcParams['figure.dpi'] = dpi - if matplotlib.__version__ < '2.0.0': - matplotlib.rcParams['savefig.dpi'] = dpi - - # Width and height - width = float(_config['width']) / dpi - height = float(_config['height']) / dpi - matplotlib.rcParams['figure.figsize'] = (width, height) - - # Font size - fontsize = _config['fontsize'] - matplotlib.rcParams['font.size'] = fontsize - - # Default Figure Format - fmt = _config['format'] - supported_formats = _config['supported_formats'] - if fmt not in supported_formats: - raise ValueError("Unsupported format %s" %fmt) - - if matplotlib.__version__ < '1.2.0': - matplotlib.rcParams.update({'savefig.format': fmt}) - else: - matplotlib.rcParams['savefig.format'] = fmt - - # Interactive mode - interactive = _config['interactive'] - matplotlib.interactive(interactive) - - -def _init_config(): - dpi = matplotlib.rcParams['figure.dpi'] - if matplotlib.__version__ < '1.2.0': - matplotlib.rcParams.update({'savefig.format': 'png'}) - fmt = matplotlib.rcParams['savefig.format'] - width, height = matplotlib.rcParams['figure.figsize'] - fontsize = matplotlib.rcParams['font.size'] - _config['dpi'] = dpi - _config['format'] = fmt - _config['width'] = width*dpi - _config['height'] = height*dpi - _config['fontsize'] = fontsize - _config['close'] = True - _config['interactive'] = matplotlib.is_interactive() - _config['angular'] = False - _config['supported_formats'] = ['png', 'jpg', 'svg'] - _config['context'] = None - - -_config = {} -_init_config() http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/pom.xml ---------------------------------------------------------------------- diff --git a/python/pom.xml b/python/pom.xml index c14d4b1..289755d 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -40,8 +40,6 @@ **/PythonInterpreterPandasSqlTest.java, **/PythonInterpreterMatplotlibTest.java </python.test.exclude> - <pypi.repo.url>https://pypi.python.org/packages</pypi.repo.url> - <python.py4j.repo.folder>/64/5c/01e13b68e8caafece40d549f232c9b5677ad1016071a48d04cc3895acaa3</python.py4j.repo.folder> <grpc.version>1.4.0</grpc.version> <plugin.shade.version>2.4.1</plugin.shade.version> </properties> @@ -138,34 +136,11 @@ </plugin> <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>wagon-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <phase>package</phase> - <goals><goal>download-single</goal></goals> - <configuration> - <url>${pypi.repo.url}${python.py4j.repo.folder}</url> - <fromFile>py4j-${python.py4j.version}.zip</fromFile> - <toFile>${project.build.directory}/../../interpreter/python/py4j-${python.py4j.version}.zip</toFile> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> <artifactId>maven-antrun-plugin</artifactId> <version>1.7</version> <executions> <execution> <phase>package</phase> - <configuration> - <target> - <unzip src="${project.build.directory}/../../interpreter/python/py4j-${python.py4j.version}.zip" - dest="${project.build.directory}/../../interpreter/python"/> - </target> - </configuration> <goals> <goal>run</goal> </goals> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java index 4fe50ee..2daa986 100644 --- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java @@ -243,16 +243,21 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand private void launchIPythonKernel(int ipythonPort) throws IOException, URISyntaxException { // copy the python scripts to a temp directory, then launch ipython kernel in that folder - File tmpPythonScriptFolder = Files.createTempDirectory("zeppelin_ipython").toFile(); + File pythonWorkDir = Files.createTempDirectory("zeppelin_ipython").toFile(); String[] ipythonScripts = {"ipython_server.py", "ipython_pb2.py", "ipython_pb2_grpc.py"}; for (String ipythonScript : ipythonScripts) { URL url = getClass().getClassLoader().getResource("grpc/python" + "/" + ipythonScript); - FileUtils.copyURLToFile(url, new File(tmpPythonScriptFolder, ipythonScript)); + FileUtils.copyURLToFile(url, new File(pythonWorkDir, ipythonScript)); } + //TODO(zjffdu) don't do hard code on py4j here + File py4jDestFile = new File(pythonWorkDir, "py4j-src-0.9.2.zip"); + FileUtils.copyURLToFile(getClass().getClassLoader().getResource( + "python/py4j-src-0.9.2.zip"), py4jDestFile); + CommandLine cmd = CommandLine.parse(pythonExecutable); - cmd.addArgument(tmpPythonScriptFolder.getAbsolutePath() + "/ipython_server.py"); + cmd.addArgument(pythonWorkDir.getAbsolutePath() + "/ipython_server.py"); cmd.addArgument(ipythonPort + ""); DefaultExecutor executor = new DefaultExecutor(); ProcessLogOutputStream processOutput = new ProcessLogOutputStream(LOGGER); @@ -261,20 +266,12 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand executor.setWatchdog(watchDog); if (useBuiltinPy4j) { - String py4jLibPath = null; - if (System.getenv("ZEPPELIN_HOME") != null) { - py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator - + PythonInterpreter.ZEPPELIN_PY4JPATH; - } else { - Path workingPath = Paths.get("..").toAbsolutePath(); - py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH; - } if (additionalPythonPath != null) { // put the py4j at the end, because additionalPythonPath may already contain py4j. // e.g. PySparkInterpreter - additionalPythonPath = additionalPythonPath + ":" + py4jLibPath; + additionalPythonPath = additionalPythonPath + ":" + py4jDestFile.getAbsolutePath(); } else { - additionalPythonPath = py4jLibPath; + additionalPythonPath = py4jDestFile.getAbsolutePath(); } } @@ -326,7 +323,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand @Override public void close() throws InterpreterException { if (watchDog != null) { - LOGGER.debug("Kill IPython Process"); + LOGGER.info("Kill IPython Process"); ipythonClient.stop(StopRequest.newBuilder().build()); watchDog.destroyProcess(); gatewayServer.shutdown(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java index 887beb8..8d3e972 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java @@ -31,9 +31,10 @@ import java.util.regex.Pattern; /** * Conda support + * TODO(zjffdu) Add removing conda env */ public class PythonCondaInterpreter extends Interpreter { - Logger logger = LoggerFactory.getLogger(PythonCondaInterpreter.class); + private static Logger logger = LoggerFactory.getLogger(PythonCondaInterpreter.class); public static final String ZEPPELIN_PYTHON = "zeppelin.python"; public static final String CONDA_PYTHON_PATH = "/bin/python"; public static final String DEFAULT_ZEPPELIN_PYTHON = "python"; @@ -145,33 +146,22 @@ public class PythonCondaInterpreter extends Interpreter { } } setCurrentCondaEnvName(envName); - python.setPythonCommand(binPath); + python.setPythonExec(binPath); } private void restartPythonProcess() throws InterpreterException { - PythonInterpreter python = getPythonInterpreter(); + logger.debug("Restarting PythonInterpreter"); + Interpreter python = + getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName()); python.close(); python.open(); } protected PythonInterpreter getPythonInterpreter() throws InterpreterException { - LazyOpenInterpreter lazy = null; PythonInterpreter python = null; Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName()); - - while (p instanceof WrappedInterpreter) { - if (p instanceof LazyOpenInterpreter) { - lazy = (LazyOpenInterpreter) p; - } - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - python = (PythonInterpreter) p; - - if (lazy != null) { - lazy.open(); - } - return python; + return (PythonInterpreter) ((LazyOpenInterpreter)p).getInnerInterpreter(); } public static String runCondaCommandForTextOutput(String title, List<String> commands) @@ -392,27 +382,50 @@ public class PythonCondaInterpreter extends Interpreter { public static String runCommand(List<String> commands) throws IOException, InterruptedException { + logger.info("Starting shell commands: " + StringUtils.join(commands, " ")); + Process process = Runtime.getRuntime().exec(commands.toArray(new String[0])); + StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream()); + StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream()); + errorGobbler.start(); + outputGobbler.start(); + if (process.waitFor() != 0) { + throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " ")); + } + logger.info("Complete shell commands: " + StringUtils.join(commands, " ")); + return outputGobbler.getOutput(); + } - StringBuilder sb = new StringBuilder(); + private static class StreamGobbler extends Thread { + InputStream is; + StringBuilder output = new StringBuilder(); - ProcessBuilder builder = new ProcessBuilder(commands); - builder.redirectErrorStream(true); - Process process = builder.start(); - InputStream stdout = process.getInputStream(); - BufferedReader br = new BufferedReader(new InputStreamReader(stdout)); - String line; - while ((line = br.readLine()) != null) { - sb.append(line); - sb.append("\n"); + // reads everything from is until empty. + StreamGobbler(InputStream is) { + this.is = is; } - int r = process.waitFor(); // Let the process finish. - if (r != 0) { - throw new RuntimeException("Failed to execute `" + - StringUtils.join(commands, " ") + "` exited with " + r); + public void run() { + try { + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + String line = null; + long startTime = System.currentTimeMillis(); + while ( (line = br.readLine()) != null) { + output.append(line + "\n"); + // logging per 5 seconds + if ((System.currentTimeMillis() - startTime) > 5000) { + logger.info(line); + startTime = System.currentTimeMillis(); + } + } + } catch (IOException ioe) { + ioe.printStackTrace(); + } } - return sb.toString(); + public String getOutput() { + return output.toString(); + } } public static String runCommand(String ... command) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java index 22f6c2e..b528efa 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java @@ -58,7 +58,7 @@ public class PythonDockerInterpreter extends Interpreter { @Override public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { - File pythonScript = new File(getPythonInterpreter().getScriptPath()); + File pythonWorkDir = getPythonInterpreter().getPythonWorkDir(); InterpreterOutput out = context.out; Matcher activateMatcher = activatePattern.matcher(st); @@ -73,26 +73,23 @@ public class PythonDockerInterpreter extends Interpreter { pull(out, image); // mount pythonscript dir - String mountPythonScript = "-v " + - pythonScript.getParentFile().getAbsolutePath() + - ":/_zeppelin_tmp "; + String mountPythonScript = "-v " + pythonWorkDir.getAbsolutePath() + + ":/_python_workdir "; // mount zeppelin dir - String mountPy4j = "-v " + - zeppelinHome.getAbsolutePath() + + String mountPy4j = "-v " + zeppelinHome.getAbsolutePath() + ":/_zeppelin "; // set PYTHONPATH - String pythonPath = ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PY4JPATH + ":" + - ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PYTHON_LIBS; + String pythonPath = ".:/_python_workdir/py4j-src-0.9.2.zip:/_python_workdir"; setPythonCommand("docker run -i --rm " + mountPythonScript + mountPy4j + "-e PYTHONPATH=\"" + pythonPath + "\" " + image + " " + - getPythonInterpreter().getPythonBindPath() + " " + - "/_zeppelin_tmp/" + pythonScript.getName()); + getPythonInterpreter().getPythonExec() + " " + + "/_python_workdir/zeppelin_python.py"); restartPythonProcess(); out.clear(); return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated"); @@ -108,7 +105,7 @@ public class PythonDockerInterpreter extends Interpreter { public void setPythonCommand(String cmd) throws InterpreterException { PythonInterpreter python = getPythonInterpreter(); - python.setPythonCommand(cmd); + python.setPythonExec(cmd); } private void printUsage(InterpreterOutput out) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index 178f79a..95cfc82 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -1,41 +1,24 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.python; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.net.*; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; - +import com.google.common.io.Files; +import com.google.gson.Gson; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteException; @@ -45,239 +28,233 @@ import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.InvalidHookException; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.WrappedInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; -import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import py4j.GatewayServer; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + /** - * Python interpreter for Zeppelin. + * Interpreter for Python, it is the first implementation of interpreter for Python, so with less + * features compared to IPythonInterpreter, but requires less prerequisites than + * IPythonInterpreter, only python installation is required. */ public class PythonInterpreter extends Interpreter implements ExecuteResultHandler { - private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class); - public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py"; - public static final String ZEPPELIN_CONTEXT = "python/zeppelin_context.py"; - public static final String ZEPPELIN_PY4JPATH = "interpreter/python/py4j-0.9.2/src"; - public static final String ZEPPELIN_PYTHON_LIBS = "interpreter/lib/python"; - public static final String DEFAULT_ZEPPELIN_PYTHON = "python"; - public static final String MAX_RESULT = "zeppelin.python.maxResult"; - - private PythonZeppelinContext zeppelinContext; - private InterpreterContext context; - private Pattern errorInLastLine = Pattern.compile(".*(Error|Exception): .*$"); - private String pythonPath; - private int maxResult; - private String py4jLibPath; - private String pythonLibPath; - - private String pythonCommand; + private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreter.class); + private static final int MAX_TIMEOUT_SEC = 10; private GatewayServer gatewayServer; private DefaultExecutor executor; - private int port; - private InterpreterOutputStream outputStream; - private BufferedWriter ins; - private PipedInputStream in; - private ByteArrayOutputStream input; - private String scriptPath; - boolean pythonscriptRunning = false; - private static final int MAX_TIMEOUT_SEC = 10; + private File pythonWorkDir; + protected boolean useBuiltinPy4j = true; - private long pythonPid = 0; + // used to forward output from python process to InterpreterOutput + private InterpreterOutputStream outputStream; + private AtomicBoolean pythonScriptRunning = new AtomicBoolean(false); + private AtomicBoolean pythonScriptInitialized = new AtomicBoolean(false); + private long pythonPid = -1; private IPythonInterpreter iPythonInterpreter; - - Integer statementSetNotifier = new Integer(0); + private BaseZeppelinContext zeppelinContext; + private String condaPythonExec; // set by PythonCondaInterpreter public PythonInterpreter(Properties property) { super(property); - try { - File scriptFile = File.createTempFile("zeppelin_python-", ".py", new File("/tmp")); - scriptPath = scriptFile.getAbsolutePath(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private String workingDir() { - URL myURL = getClass().getProtectionDomain().getCodeSource().getLocation(); - java.net.URI myURI = null; - try { - myURI = myURL.toURI(); - } catch (URISyntaxException e1) - {} - String path = java.nio.file.Paths.get(myURI).toFile().toString(); - return path; } - private void createPythonScript() throws InterpreterException { - File out = new File(scriptPath); - - if (out.exists() && out.isDirectory()) { - throw new InterpreterException("Can't create python script " + out.getAbsolutePath()); + @Override + public void open() throws InterpreterException { + // try IPythonInterpreter first + iPythonInterpreter = getIPythonInterpreter(); + if (getProperty("zeppelin.python.useIPython", "true").equals("true") && + StringUtils.isEmpty( + iPythonInterpreter.checkIPythonPrerequisite(getPythonExec()))) { + try { + iPythonInterpreter.open(); + LOGGER.info("IPython is available, Use IPythonInterpreter to replace PythonInterpreter"); + return; + } catch (Exception e) { + iPythonInterpreter = null; + LOGGER.warn("Fail to open IPythonInterpreter", e); + } } - copyFile(out, ZEPPELIN_PYTHON); - // copy zeppelin_context.py as well - File zOut = new File(out.getParent() + "/zeppelin_context.py"); - copyFile(zOut, ZEPPELIN_CONTEXT); - - logger.info("File {} , {} created", scriptPath, zOut.getAbsolutePath()); - } - - public String getScriptPath() { - return scriptPath; - } + // reset iPythonInterpreter to null as it is not available + iPythonInterpreter = null; + LOGGER.info("IPython is not available, use the native PythonInterpreter"); + // Add matplotlib display hook + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { + try { + // just for unit test I believe (zjffdu) + registerHook(HookType.POST_EXEC_DEV.getName(), "__zeppelin__._displayhook()"); + } catch (InvalidHookException e) { + throw new InterpreterException(e); + } + } - private void copyFile(File out, String sourceFile) throws InterpreterException { - ClassLoader classLoader = getClass().getClassLoader(); try { - FileOutputStream outStream = new FileOutputStream(out); - IOUtils.copy( - classLoader.getResourceAsStream(sourceFile), - outStream); - outStream.close(); + createGatewayServerAndStartScript(); } catch (IOException e) { - throw new InterpreterException(e); + LOGGER.error("Fail to open PythonInterpreter", e); + throw new InterpreterException("Fail to open PythonInterpreter", e); } } - private void createGatewayServerAndStartScript() - throws UnknownHostException, InterpreterException { - createPythonScript(); - if (System.getenv("ZEPPELIN_HOME") != null) { - py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PY4JPATH; - pythonLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PYTHON_LIBS; - } else { - Path workingPath = Paths.get("..").toAbsolutePath(); - py4jLibPath = workingPath + File.separator + ZEPPELIN_PY4JPATH; - pythonLibPath = workingPath + File.separator + ZEPPELIN_PYTHON_LIBS; - } - - port = findRandomOpenPortOnAllLocalInterfaces(); + // start gateway sever and start python process + private void createGatewayServerAndStartScript() throws IOException { + // start gateway server in JVM side + int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + // use the FQDN as the server address instead of 127.0.0.1 so that python process in docker + // container can also connect to this gateway server. + String serverAddress = getLocalIP(); gatewayServer = new GatewayServer(this, port, GatewayServer.DEFAULT_PYTHON_PORT, - InetAddress.getByName("0.0.0.0"), - InetAddress.getByName("0.0.0.0"), + InetAddress.getByName(serverAddress), + InetAddress.getByName(serverAddress), GatewayServer.DEFAULT_CONNECT_TIMEOUT, GatewayServer.DEFAULT_READ_TIMEOUT, - (List) null); - + (List) null);; gatewayServer.start(); + LOGGER.info("Starting GatewayServer at " + serverAddress + ":" + port); - // Run python shell - String pythonCmd = getPythonCommand(); - CommandLine cmd = CommandLine.parse(pythonCmd); - - if (!pythonCmd.endsWith(".py")) { - // PythonDockerInterpreter set pythoncmd with script - cmd.addArgument(getScriptPath(), false); + // launch python process to connect to the gateway server in JVM side + createPythonScript(); + String pythonExec = getPythonExec(); + CommandLine cmd = CommandLine.parse(pythonExec); + if (!pythonExec.endsWith(".py")) { + // PythonDockerInterpreter set pythonExec with script + cmd.addArgument(pythonWorkDir + "/zeppelin_python.py", false); } + cmd.addArgument(serverAddress, false); cmd.addArgument(Integer.toString(port), false); - cmd.addArgument(getLocalIp(), false); executor = new DefaultExecutor(); - outputStream = new InterpreterOutputStream(LOG); - PipedOutputStream ps = new PipedOutputStream(); - in = null; - try { - in = new PipedInputStream(ps); - } catch (IOException e1) { - throw new InterpreterException(e1); - } - ins = new BufferedWriter(new OutputStreamWriter(ps)); - input = new ByteArrayOutputStream(); - - PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in); + outputStream = new InterpreterOutputStream(LOGGER); + PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream); executor.setStreamHandler(streamHandler); executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); - try { - Map env = EnvironmentUtils.getProcEnvironment(); - if (!env.containsKey("PYTHONPATH")) { - env.put("PYTHONPATH", py4jLibPath + File.pathSeparator + pythonLibPath); - } else { - env.put("PYTHONPATH", env.get("PYTHONPATH") + File.pathSeparator + - py4jLibPath + File.pathSeparator + pythonLibPath); - } + Map<String, String> env = setupPythonEnv(); + LOGGER.info("Launching Python Process Command: " + cmd.getExecutable() + + " " + StringUtils.join(cmd.getArguments(), " ")); + executor.execute(cmd, env, this); + pythonScriptRunning.set(true); + } - logger.info("cmd = {}", cmd.toString()); - executor.execute(cmd, env, this); - pythonscriptRunning = true; - } catch (IOException e) { - throw new InterpreterException(e); + private void createPythonScript() throws IOException { + // set java.io.tmpdir to /tmp on MacOS, because docker can not share the /var folder which will + // cause PythonDockerInterpreter fails. + // https://stackoverflow.com/questions/45122459/docker-mounts-denied-the-paths-are-not-shared- + // from-os-x-and-are-not-known + if (System.getProperty("os.name", "").contains("Mac")) { + System.setProperty("java.io.tmpdir", "/tmp"); + } + this.pythonWorkDir = Files.createTempDir(); + this.pythonWorkDir.deleteOnExit(); + LOGGER.info("Create Python working dir: " + pythonWorkDir.getAbsolutePath()); + copyResourceToPythonWorkDir("python/zeppelin_python.py", "zeppelin_python.py"); + copyResourceToPythonWorkDir("python/zeppelin_context.py", "zeppelin_context.py"); + copyResourceToPythonWorkDir("python/backend_zinline.py", "backend_zinline.py"); + copyResourceToPythonWorkDir("python/mpl_config.py", "mpl_config.py"); + copyResourceToPythonWorkDir("python/py4j-src-0.9.2.zip", "py4j-src-0.9.2.zip"); + } + + protected boolean useIPython() { + return this.iPythonInterpreter != null; + } + + private String getLocalIP() { + // zeppelin.python.gatewayserver_address is only for unit test on travis. + // Because the FQDN would fail unit test on travis ci. + String gatewayserver_address = + properties.getProperty("zeppelin.python.gatewayserver_address"); + if (gatewayserver_address != null) { + return gatewayserver_address; } try { - input.write("import sys, getopt\n".getBytes()); - ins.flush(); - } catch (IOException e) { - throw new InterpreterException(e); + return Inet4Address.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + LOGGER.warn("can't get local IP", e); } + // fall back to loopback addreess + return "127.0.0.1"; } - @Override - public void open() throws InterpreterException { - // try IPythonInterpreter first. If it is not available, we will fallback to the original - // python interpreter implementation. - iPythonInterpreter = getIPythonInterpreter(); - this.zeppelinContext = new PythonZeppelinContext( - getInterpreterGroup().getInterpreterHookRegistry(), - Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000"))); - if (getProperty("zeppelin.python.useIPython", "true").equals("true") && - StringUtils.isEmpty(iPythonInterpreter.checkIPythonPrerequisite(getPythonBindPath()))) { - try { - iPythonInterpreter.open(); - LOG.info("IPython is available, Use IPythonInterpreter to replace PythonInterpreter"); - return; - } catch (Exception e) { - iPythonInterpreter = null; - LOG.warn("Fail to open IPythonInterpreter", e); + private void copyResourceToPythonWorkDir(String srcResourceName, + String dstFileName) throws IOException { + FileOutputStream out = null; + try { + out = new FileOutputStream(pythonWorkDir.getAbsoluteFile() + "/" + dstFileName); + IOUtils.copy( + getClass().getClassLoader().getResourceAsStream(srcResourceName), + out); + } finally { + if (out != null) { + out.close(); } } + } - // reset iPythonInterpreter to null as it is not available - iPythonInterpreter = null; - LOG.info("IPython is not available, use the native PythonInterpreter"); - // Add matplotlib display hook - InterpreterGroup intpGroup = getInterpreterGroup(); - if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { - try { - registerHook(HookType.POST_EXEC_DEV.getName(), "__zeppelin__._displayhook()"); - } catch (InvalidHookException e) { - throw new InterpreterException(e); - } - } - // Add matplotlib display hook - try { - createGatewayServerAndStartScript(); - } catch (UnknownHostException e) { - throw new InterpreterException(e); + protected Map<String, String> setupPythonEnv() throws IOException { + Map<String, String> env = EnvironmentUtils.getProcEnvironment(); + appendToPythonPath(env, pythonWorkDir.getAbsolutePath()); + if (useBuiltinPy4j) { + appendToPythonPath(env, pythonWorkDir.getAbsolutePath() + "/py4j-src-0.9.2.zip"); } + LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH")); + return env; } - private IPythonInterpreter getIPythonInterpreter() { - LazyOpenInterpreter lazy = null; - IPythonInterpreter ipython = null; - Interpreter p = getInterpreterInTheSameSessionByClassName(IPythonInterpreter.class.getName()); + private void appendToPythonPath(Map<String, String> env, String path) { + if (!env.containsKey("PYTHONPATH")) { + env.put("PYTHONPATH", path); + } else { + env.put("PYTHONPATH", env.get("PYTHONPATH") + ":" + path); + } + } - while (p instanceof WrappedInterpreter) { - if (p instanceof LazyOpenInterpreter) { - lazy = (LazyOpenInterpreter) p; - } - p = ((WrappedInterpreter) p).getInnerInterpreter(); + // Run python script + // Choose python in the order of + // condaPythonExec > zeppelin.python + protected String getPythonExec() { + if (condaPythonExec != null) { + return condaPythonExec; + } else { + return getProperty("zeppelin.python", "python"); } - ipython = (IPythonInterpreter) p; - return ipython; + } + + public File getPythonWorkDir() { + return pythonWorkDir; } @Override @@ -286,54 +263,58 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl iPythonInterpreter.close(); return; } - pythonscriptRunning = false; - pythonScriptInitialized = false; - - try { - ins.flush(); - ins.close(); - input.flush(); - input.close(); - } catch (IOException e) { - e.printStackTrace(); - } + pythonScriptRunning.set(false); + pythonScriptInitialized.set(false); executor.getWatchdog().destroyProcess(); - new File(scriptPath).delete(); gatewayServer.shutdown(); - // wait until getStatements stop - synchronized (statementSetNotifier) { - try { - statementSetNotifier.wait(1500); - } catch (InterruptedException e) { - } - statementSetNotifier.notify(); - } + // reset these 2 monitors otherwise when you restart PythonInterpreter it would fails to execute + // python code as these 2 objects are in incorrect state. + statementSetNotifier = new Integer(0); + statementFinishedNotifier = new Integer(0); + } + + private PythonInterpretRequest pythonInterpretRequest = null; + private Integer statementSetNotifier = new Integer(0); + private Integer statementFinishedNotifier = new Integer(0); + private String statementOutput = null; + private boolean statementError = false; + + public void setPythonExec(String pythonExec) { + LOGGER.info("Set Python Command : {}", pythonExec); + this.condaPythonExec = pythonExec; } - PythonInterpretRequest pythonInterpretRequest = null; /** - * Result class of python interpreter + * Request send to Python Daemon */ public class PythonInterpretRequest { public String statements; + public boolean isForCompletion; - public PythonInterpretRequest(String statements) { + public PythonInterpretRequest(String statements, boolean isForCompletion) { this.statements = statements; + this.isForCompletion = isForCompletion; } public String statements() { return statements; } + + public boolean isForCompletion() { + return isForCompletion; + } } + // called by Python Process public PythonInterpretRequest getStatements() { synchronized (statementSetNotifier) { - while (pythonInterpretRequest == null && pythonscriptRunning && pythonScriptInitialized) { + while (pythonInterpretRequest == null) { try { statementSetNotifier.wait(1000); } catch (InterruptedException e) { + e.printStackTrace(); } } PythonInterpretRequest req = pythonInterpretRequest; @@ -342,65 +323,78 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl } } - String statementOutput = null; - boolean statementError = false; - Integer statementFinishedNotifier = new Integer(0); - + // called by Python Process public void setStatementsFinished(String out, boolean error) { synchronized (statementFinishedNotifier) { + LOGGER.debug("Setting python statement output: " + out + ", error: " + error); statementOutput = out; statementError = error; statementFinishedNotifier.notify(); } } - boolean pythonScriptInitialized = false; - Integer pythonScriptInitializeNotifier = new Integer(0); - + // called by Python Process public void onPythonScriptInitialized(long pid) { pythonPid = pid; - synchronized (pythonScriptInitializeNotifier) { - pythonScriptInitialized = true; - pythonScriptInitializeNotifier.notifyAll(); + synchronized (pythonScriptInitialized) { + LOGGER.debug("onPythonScriptInitialized is called"); + pythonScriptInitialized.set(true); + pythonScriptInitialized.notifyAll(); } } + // called by Python Process public void appendOutput(String message) throws IOException { + LOGGER.debug("Output from python process: " + message); outputStream.getInterpreterOutput().write(message); } - @Override - public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) - throws InterpreterException { - if (iPythonInterpreter != null) { - return iPythonInterpreter.interpret(cmd, contextInterpreter); - } + // used by subclass such as PySparkInterpreter to set JobGroup before executing spark code + protected void preCallPython(InterpreterContext context) { - if (cmd == null || cmd.isEmpty()) { - return new InterpreterResult(Code.SUCCESS, ""); + } + + // blocking call. Send python code to python process and get response + protected void callPython(PythonInterpretRequest request) { + synchronized (statementSetNotifier) { + this.pythonInterpretRequest = request; + statementOutput = null; + statementSetNotifier.notify(); } - this.context = contextInterpreter; + synchronized (statementFinishedNotifier) { + while (statementOutput == null) { + try { + statementFinishedNotifier.wait(1000); + } catch (InterruptedException e) { + } + } + } + } - zeppelinContext.setGui(context.getGui()); - zeppelinContext.setNoteGui(context.getNoteGui()); - zeppelinContext.setInterpreterContext(context); + @Override + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { + if (iPythonInterpreter != null) { + return iPythonInterpreter.interpret(st, context); + } - if (!pythonscriptRunning) { - return new InterpreterResult(Code.ERROR, "python process not running" - + outputStream.toString()); + if (!pythonScriptRunning.get()) { + return new InterpreterResult(Code.ERROR, "python process not running " + + outputStream.toString()); } outputStream.setInterpreterOutput(context.out); - synchronized (pythonScriptInitializeNotifier) { + synchronized (pythonScriptInitialized) { long startTime = System.currentTimeMillis(); - while (pythonScriptInitialized == false - && pythonscriptRunning - && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) { + while (!pythonScriptInitialized.get() + && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) { try { - pythonScriptInitializeNotifier.wait(1000); + LOGGER.info("Wait for PythonScript initialized"); + pythonScriptInitialized.wait(100); } catch (InterruptedException e) { + e.printStackTrace(); } } } @@ -413,59 +407,40 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl throw new InterpreterException(e); } - if (pythonscriptRunning == false) { - // python script failed to initialize and terminated - errorMessage.add(new InterpreterResultMessage( - InterpreterResult.Type.TEXT, "failed to start python")); - return new InterpreterResult(Code.ERROR, errorMessage); - } - if (pythonScriptInitialized == false) { + if (!pythonScriptInitialized.get()) { // timeout. didn't get initialized message errorMessage.add(new InterpreterResultMessage( - InterpreterResult.Type.TEXT, "python is not responding")); + InterpreterResult.Type.TEXT, "Failed to initialize Python")); return new InterpreterResult(Code.ERROR, errorMessage); } - pythonInterpretRequest = new PythonInterpretRequest(cmd); - statementOutput = null; - - synchronized (statementSetNotifier) { - statementSetNotifier.notify(); - } + BaseZeppelinContext z = getZeppelinContext(); + z.setInterpreterContext(context); + z.setGui(context.getGui()); + z.setNoteGui(context.getNoteGui()); + InterpreterContext.set(context); - synchronized (statementFinishedNotifier) { - while (statementOutput == null) { - try { - statementFinishedNotifier.wait(1000); - } catch (InterruptedException e) { - } - } - } + preCallPython(context); + callPython(new PythonInterpretRequest(st, false)); if (statementError) { return new InterpreterResult(Code.ERROR, statementOutput); } else { - try { context.out.flush(); } catch (IOException e) { throw new InterpreterException(e); } - return new InterpreterResult(Code.SUCCESS); } } - public InterpreterContext getCurrentInterpreterContext() { - return context; - } - public void interrupt() throws IOException, InterpreterException { if (pythonPid > -1) { - logger.info("Sending SIGINT signal to PID : " + pythonPid); + LOGGER.info("Sending SIGINT signal to PID : " + pythonPid); Runtime.getRuntime().exec("kill -SIGINT " + pythonPid); } else { - logger.warn("Non UNIX/Linux system, close the interpreter"); + LOGGER.warn("Non UNIX/Linux system, close the interpreter"); close(); } } @@ -474,11 +449,12 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl public void cancel(InterpreterContext context) throws InterpreterException { if (iPythonInterpreter != null) { iPythonInterpreter.cancel(context); + return; } try { interrupt(); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Error", e); } } @@ -495,114 +471,162 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl return 0; } - @Override - public Scheduler getScheduler() { - if (iPythonInterpreter != null) { - return iPythonInterpreter.getScheduler(); - } - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - PythonInterpreter.class.getName() + this.hashCode()); - } @Override public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { + InterpreterContext interpreterContext) + throws InterpreterException { if (iPythonInterpreter != null) { return iPythonInterpreter.completion(buf, cursor, interpreterContext); } - return null; - } + if (buf.length() < cursor) { + cursor = buf.length(); + } + String completionString = getCompletionTargetString(buf, cursor); + String completionCommand = "__zeppelin_completion__.getCompletion('" + completionString + "')"; + LOGGER.debug("completionCommand: " + completionCommand); - public void setPythonCommand(String cmd) { - logger.info("Set Python Command : {}", cmd); - pythonCommand = cmd; - } + pythonInterpretRequest = new PythonInterpretRequest(completionCommand, true); + statementOutput = null; - private String getPythonCommand() { - if (pythonCommand == null) { - return getPythonBindPath(); - } else { - return pythonCommand; + synchronized (statementSetNotifier) { + statementSetNotifier.notify(); } - } - public String getPythonBindPath() { - String path = getProperty("zeppelin.python"); - if (path == null) { - return DEFAULT_ZEPPELIN_PYTHON; - } else { - return path; + String[] completionList = null; + synchronized (statementFinishedNotifier) { + long startTime = System.currentTimeMillis(); + while (statementOutput == null + && pythonScriptRunning.get()) { + try { + if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) { + LOGGER.error("Python completion didn't have response for {}sec.", MAX_TIMEOUT_SEC); + break; + } + statementFinishedNotifier.wait(1000); + } catch (InterruptedException e) { + // not working + LOGGER.info("wait drop"); + return new LinkedList<>(); + } + } + if (statementError) { + return new LinkedList<>(); + } + Gson gson = new Gson(); + completionList = gson.fromJson(statementOutput, String[].class); + } + //end code for completion + if (completionList == null) { + return new LinkedList<>(); } - } - private Job getRunningJob(String paragraphId) { - Job foundJob = null; - Collection<Job> jobsRunning = getScheduler().getJobsRunning(); - for (Job job : jobsRunning) { - if (job.getId().equals(paragraphId)) { - foundJob = job; - break; - } + List<InterpreterCompletion> results = new LinkedList<>(); + for (String name: completionList) { + results.add(new InterpreterCompletion(name, name, StringUtils.EMPTY)); } - return foundJob; + return results; } - void bootStrapInterpreter(String file) throws IOException { - BufferedReader bootstrapReader = new BufferedReader( - new InputStreamReader( - PythonInterpreter.class.getResourceAsStream(file))); - String line = null; - String bootstrapCode = ""; + private String getCompletionTargetString(String text, int cursor) { + String[] completionSeqCharaters = {" ", "\n", "\t"}; + int completionEndPosition = cursor; + int completionStartPosition = cursor; + int indexOfReverseSeqPostion = cursor; - while ((line = bootstrapReader.readLine()) != null) { - bootstrapCode += line + "\n"; + String resultCompletionText = ""; + String completionScriptText = ""; + try { + completionScriptText = text.substring(0, cursor); } + catch (Exception e) { + LOGGER.error(e.toString()); + return null; + } + completionEndPosition = completionScriptText.length(); + + String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString(); + + for (String seqCharacter : completionSeqCharaters) { + indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter); + + if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) { + completionStartPosition = indexOfReverseSeqPostion; + } - try { - interpret(bootstrapCode, context); - } catch (InterpreterException e) { - throw new IOException(e); } - } - public PythonZeppelinContext getZeppelinContext() { - return zeppelinContext; + if (completionStartPosition == completionEndPosition) { + completionStartPosition = 0; + } + else + { + completionStartPosition = completionEndPosition - completionStartPosition; + } + resultCompletionText = completionScriptText.substring( + completionStartPosition , completionEndPosition); + + return resultCompletionText; } - String getLocalIp() { - try { - return Inet4Address.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - logger.error("can't get local IP", e); + protected IPythonInterpreter getIPythonInterpreter() { + LazyOpenInterpreter lazy = null; + IPythonInterpreter iPython = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(IPythonInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); } - // fall back to loopback addreess - return "127.0.0.1"; + iPython = (IPythonInterpreter) p; + return iPython; } - private int findRandomOpenPortOnAllLocalInterfaces() { - Integer port = -1; - try (ServerSocket socket = new ServerSocket(0);) { - port = socket.getLocalPort(); - socket.close(); - } catch (IOException e) { - LOG.error("Can't find an open port", e); + protected BaseZeppelinContext createZeppelinContext() { + return new PythonZeppelinContext( + getInterpreterGroup().getInterpreterHookRegistry(), + Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000"))); + } + + public BaseZeppelinContext getZeppelinContext() { + if (zeppelinContext == null) { + zeppelinContext = createZeppelinContext(); } - return port; + return zeppelinContext; } - public int getMaxResult() { - return maxResult; + protected void bootstrapInterpreter(String resourceName) throws IOException { + LOGGER.info("Bootstrap interpreter via " + resourceName); + String bootstrapCode = + IOUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName)); + try { + InterpreterResult result = interpret(bootstrapCode, InterpreterContext.get()); + if (result.code() != Code.SUCCESS) { + throw new IOException("Fail to run bootstrap script: " + resourceName); + } + } catch (InterpreterException e) { + throw new IOException(e); + } } @Override public void onProcessComplete(int exitValue) { - pythonscriptRunning = false; - logger.info("python process terminated. exit code " + exitValue); + LOGGER.info("python process terminated. exit code " + exitValue); + pythonScriptRunning.set(false); + pythonScriptInitialized.set(false); } @Override public void onProcessFailed(ExecuteException e) { - pythonscriptRunning = false; - logger.error("python process failed", e); + LOGGER.error("python process failed", e); + pythonScriptRunning.set(false); + pythonScriptInitialized.set(false); + } + + // Called by Python Process, used for debugging purpose + public void logPythonOutput(String message) { + LOGGER.debug("Python Process Output: " + message); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java index 54984c3..db65960 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java @@ -70,7 +70,7 @@ public class PythonInterpreterPandasSql extends Interpreter { LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY); PythonInterpreter python = getPythonInterpreter(); - python.bootStrapInterpreter(SQL_BOOTSTRAP_FILE_PY); + python.bootstrapInterpreter(SQL_BOOTSTRAP_FILE_PY); } catch (IOException e) { LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/src/main/resources/python/backend_zinline.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/backend_zinline.py b/python/src/main/resources/python/backend_zinline.py new file mode 100644 index 0000000..1c84699 --- /dev/null +++ b/python/src/main/resources/python/backend_zinline.py @@ -0,0 +1,317 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file provides a static (non-interactive) matplotlib plotting backend +# for zeppelin notebooks for use with the python/pyspark interpreters + +from __future__ import print_function + +import sys +import uuid +import warnings +import base64 +from io import BytesIO +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +import mpl_config +import matplotlib +from matplotlib._pylab_helpers import Gcf +from matplotlib.backends.backend_agg import new_figure_manager, FigureCanvasAgg +from matplotlib.backend_bases import ShowBase, FigureManagerBase +from matplotlib.figure import Figure + +######################################################################## +# +# The following functions and classes are for pylab and implement +# window/figure managers, etc... +# +######################################################################## + +class Show(ShowBase): + """ + A callable object that displays the figures to the screen. Valid kwargs + include figure width and height (in units supported by the div tag), block + (allows users to override blocking behavior regardless of whether or not + interactive mode is enabled, currently unused) and close (Implicitly call + matplotlib.pyplot.close('all') with each call to show()). + """ + def __call__(self, close=None, block=None, **kwargs): + if close is None: + close = mpl_config.get('close') + try: + managers = Gcf.get_all_fig_managers() + if not managers: + return + + # Tell zeppelin that the output will be html using the %html magic + # We want to do this only once to avoid seeing "%html" printed + # directly to the outout when multiple figures are displayed from + # one paragraph. + if mpl_config.get('angular'): + print('%angular') + else: + print('%html') + + # Show all open figures + for manager in managers: + manager.show(**kwargs) + finally: + # This closes all the figures if close is set to True. + if close and Gcf.get_all_fig_managers(): + Gcf.destroy_all() + + +class FigureCanvasZInline(FigureCanvasAgg): + """ + The canvas the figure renders into. Calls the draw and print fig + methods, creates the renderers, etc... + """ + def get_bytes(self, **kwargs): + """ + Get the byte representation of the figure. + Should only be used with jpg/png formats. + """ + # Make sure format is correct + fmt = kwargs.get('format', mpl_config.get('format')) + if fmt == 'svg': + raise ValueError("get_bytes() does not support svg, use png or jpg") + + # Express the image as bytes + buf = BytesIO() + self.print_figure(buf, **kwargs) + fmt = fmt.encode() + if sys.version_info >= (3, 4) and sys.version_info < (3, 5): + byte_str = bytes("data:image/%s;base64," %fmt, "utf-8") + else: + byte_str = b"data:image/%s;base64," %fmt + byte_str += base64.b64encode(buf.getvalue()) + + # Python3 forces all strings to default to unicode, but for raster image + # formats (eg png, jpg), we want to work with bytes. Thus this step is + # needed to ensure compatability for all python versions. + byte_str = byte_str.decode('ascii') + buf.close() + return byte_str + + def get_svg(self, **kwargs): + """ + Get the svg representation of the figure. + Should only be used with svg format. + """ + # Make sure format is correct + fmt = kwargs.get('format', mpl_config.get('format')) + if fmt != 'svg': + raise ValueError("get_svg() does not support png or jpg, use svg") + + # For SVG the data string has to be unicode, not bytes + buf = StringIO() + self.print_figure(buf, **kwargs) + svg_str = buf.getvalue() + buf.close() + return svg_str + + def draw_idle(self, *args, **kwargs): + """ + Called when the figure gets updated (eg through a plotting command). + This is overriden to allow open figures to be reshown after they + are updated when mpl_config.get('close') is False. + """ + if not self._is_idle_drawing: + with self._idle_draw_cntx(): + self.draw(*args, **kwargs) + draw_if_interactive() + + +class FigureManagerZInline(FigureManagerBase): + """ + Wrap everything up into a window for the pylab interface + """ + def __init__(self, canvas, num): + FigureManagerBase.__init__(self, canvas, num) + self.fig_id = "figure_{0}".format(uuid.uuid4().hex) + self._shown = False + + def angular_bind(self, **kwargs): + """ + Bind figure data to Zeppelin's Angular Object Registry. + If mpl_config("angular") is True and PY4J is supported, this allows + for the possibility to interactively update a figure from a separate + paragraph without having to display it multiple times. + """ + # This doesn't work for SVG so make sure it's not our format + fmt = kwargs.get('format', mpl_config.get('format')) + if fmt == 'svg': + return + + # Get the figure data as a byte array + src = self.canvas.get_bytes(**kwargs) + + # Flag to determine whether or not to use + # zeppelin's angular display system + angular = mpl_config.get('angular') + + # ZeppelinContext instance (requires PY4J) + context = mpl_config.get('context') + + # Finally we must ensure that automatic closing is set to False, + # as otherwise using the angular display system is pointless + close = mpl_config.get('close') + + # If above conditions are met, bind the figure data to + # the Angular Object Registry. + if not close and angular: + if hasattr(context, 'angularBind'): + # Binding is performed through figure ID to ensure this works + # if multiple figures are open + context.angularBind(self.fig_id, src) + + # Zeppelin will automatically replace this value even if it + # is updated from another pargraph thanks to the {{}} notation + src = "{{%s}}" %self.fig_id + else: + warnings.warn("Cannot bind figure to Angular Object Registry. " + "Check if PY4J is installed.") + return src + + def angular_unbind(self): + """ + Unbind figure from angular display system. + """ + context = mpl_config.get('context') + if hasattr(context, 'angularUnbind'): + context.angularUnbind(self.fig_id) + + def destroy(self): + """ + Called when close=True or implicitly by pyplot.close(). + Overriden to automatically clean up the angular object registry. + """ + self.angular_unbind() + + def show(self, **kwargs): + if not self._shown: + zdisplay(self.canvas.figure, **kwargs) + else: + self.canvas.draw_idle() + self.angular_bind(**kwargs) + + self._shown = True + + +def draw_if_interactive(): + """ + If interactive mode is on, this allows for updating properties of + the figure when each new plotting command is called. + """ + manager = Gcf.get_active() + interactive = matplotlib.is_interactive() + angular = mpl_config.get('angular') + + # Don't bother continuing if we aren't in interactive mode + # or if there are no active figures. Also pointless to continue + # in angular mode as we don't want to reshow the figure. + if not interactive or angular or manager is None: + return + + # Allow for figure to be reshown if close is false since + # this function call implies that it has been updated + if not mpl_config.get('close'): + manager._shown = False + + +def new_figure_manager(num, *args, **kwargs): + """ + Create a new figure manager instance + """ + # if a main-level app must be created, this (and + # new_figure_manager_given_figure) is the usual place to + # do it -- see backend_wx, backend_wxagg and backend_tkagg for + # examples. Not all GUIs require explicit instantiation of a + # main-level app (egg backend_gtk, backend_gtkagg) for pylab + FigureClass = kwargs.pop('FigureClass', Figure) + thisFig = FigureClass(*args, **kwargs) + return new_figure_manager_given_figure(num, thisFig) + + +def new_figure_manager_given_figure(num, figure): + """ + Create a new figure manager instance for the given figure. + """ + canvas = FigureCanvasZInline(figure) + manager = FigureManagerZInline(canvas, num) + return manager + + +######################################################################## +# +# Backend specific functions +# +######################################################################## + +def zdisplay(fig, **kwargs): + """ + Publishes a matplotlib figure to the notebook paragraph output. + """ + # kwargs can be width or height (in units supported by div tag) + width = kwargs.pop('width', 'auto') + height = kwargs.pop('height', 'auto') + fmt = kwargs.get('format', mpl_config.get('format')) + + # Check if format is supported + supported_formats = mpl_config.get('supported_formats') + if fmt not in supported_formats: + raise ValueError("Unsupported format %s" %fmt) + + # For SVG the data string has to be unicode, not bytes + if fmt == 'svg': + img = fig.canvas.get_svg(**kwargs) + + # This is needed to ensure the SVG image is the correct size. + # We should find a better way to do this... + width = '{}px'.format(mpl_config.get('width')) + height = '{}px'.format(mpl_config.get('height')) + else: + # Express the image as bytes + src = fig.canvas.manager.angular_bind(**kwargs) + img = "<img src={src} style='width={width};height:{height}'>" + img = img.format(src=src, width=width, height=height) + + # Print the image to the notebook paragraph via the %html magic + html = "<div style='width:{width};height:{height}'>{img}<div>" + print(html.format(width=width, height=height, img=img)) + +def displayhook(): + """ + Called post paragraph execution if interactive mode is on + """ + if matplotlib.is_interactive(): + show() + +######################################################################## +# +# Now just provide the standard names that backend.__init__ is expecting +# +######################################################################## + +# Create a reference to the show function we are using. This is what actually +# gets called by matplotlib.pyplot.show(). +show = Show() + +# Default FigureCanvas and FigureManager classes to use from the backend +FigureCanvas = FigureCanvasZInline +FigureManager = FigureManagerZInline http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/src/main/resources/python/mpl_config.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/mpl_config.py b/python/src/main/resources/python/mpl_config.py new file mode 100644 index 0000000..5c60893 --- /dev/null +++ b/python/src/main/resources/python/mpl_config.py @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This module provides utitlites for users to configure the inline plotting +# backend through a PyZeppelinContext instance (eg, through z.configure_mpl()) + +import matplotlib + +def configure(**kwargs): + """ + Generic configure function. + Usage: configure(prop1='foo', prop2='bar', ...) + Currently supported zeppelin-specific properties are: + interactive - If true show all figures without explicit call to show() + via a post-execute hook. + angular - If true, bind figures to angular display system. + close - If true, close all figures once shown. + width, height - Default width / height of the figure in pixels. + fontsize - Font size. + dpi - dpi of the figure. + fmt - Figure format + supported_formats - Supported Figure formats () + context - ZeppelinContext instance (requires PY4J) + """ + _config.update(**kwargs) + + # Broadcast relevant changes to matplotlib RC + _on_config_change() + + +def get(key): + """ + Get the configuration info given a key + """ + return _config[key] + + +def _on_config_change(): + # dpi + dpi = _config['dpi'] + + # For older versions of matplotlib, savefig.dpi is not synced with + # figure.dpi by default + matplotlib.rcParams['figure.dpi'] = dpi + if matplotlib.__version__ < '2.0.0': + matplotlib.rcParams['savefig.dpi'] = dpi + + # Width and height + width = float(_config['width']) / dpi + height = float(_config['height']) / dpi + matplotlib.rcParams['figure.figsize'] = (width, height) + + # Font size + fontsize = _config['fontsize'] + matplotlib.rcParams['font.size'] = fontsize + + # Default Figure Format + fmt = _config['format'] + supported_formats = _config['supported_formats'] + if fmt not in supported_formats: + raise ValueError("Unsupported format %s" %fmt) + + if matplotlib.__version__ < '1.2.0': + matplotlib.rcParams.update({'savefig.format': fmt}) + else: + matplotlib.rcParams['savefig.format'] = fmt + + # Interactive mode + interactive = _config['interactive'] + matplotlib.interactive(interactive) + + +def _init_config(): + dpi = matplotlib.rcParams['figure.dpi'] + if matplotlib.__version__ < '1.2.0': + matplotlib.rcParams.update({'savefig.format': 'png'}) + fmt = matplotlib.rcParams['savefig.format'] + width, height = matplotlib.rcParams['figure.figsize'] + fontsize = matplotlib.rcParams['font.size'] + _config['dpi'] = dpi + _config['format'] = fmt + _config['width'] = width*dpi + _config['height'] = height*dpi + _config['fontsize'] = fontsize + _config['close'] = True + _config['interactive'] = matplotlib.is_interactive() + _config['angular'] = False + _config['supported_formats'] = ['png', 'jpg', 'svg'] + _config['context'] = None + + +_config = {} +_init_config() http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0a97446a/python/src/main/resources/python/py4j-src-0.9.2.zip ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/py4j-src-0.9.2.zip b/python/src/main/resources/python/py4j-src-0.9.2.zip new file mode 100644 index 0000000..8ceb15c Binary files /dev/null and b/python/src/main/resources/python/py4j-src-0.9.2.zip differ