Rewrite PythonInterpreter. ### What is this PR for? I've been testing the python interpreter and I found at least 4 major issues in the current python interpreter.
1. not working streaming output. - https://issues.apache.org/jira/browse/ZEPPELIN-2225 2. printed "..." when there is indent in the python code. - https://issues.apache.org/jira/browse/ZEPPELIN-1929 3. very slow output of matplotlib - https://issues.apache.org/jira/browse/ZEPPELIN-1894 - https://issues.apache.org/jira/browse/ZEPPELIN-1360 4. Unexpected output of matplotlib. - https://issues.apache.org/jira/browse/ZEPPELIN-2107 so I changed python interpreter to use py4j based on pyspark interpreter and would be fixed above issues. and I am going to recreate conda, docker for python interpreter ASAP. ### What type of PR is it? Bug Fix | Hot Fix | Refactoring ### How should this be tested? 1. not working streaming output. ``` import time for x in range(0, 5): print x time.sleep(1) ``` 2. printed "..." when there is indent in the python code. ``` def fn(): print("hi") fn() ``` 3. very slow output of matplotlib. ``` import matplotlib import sys import matplotlib.pyplot as plt plt.plot([1,2,3]) ``` 4. Unexpected output of matplotlib. ``` import matplotlib.pyplot as plt import matplotlib as mpl # Make a figure and axes with dimensions as desired. fig = plt.figure(figsize=(8, 3)) ax1 = fig.add_axes([0.05, 0.80, 0.9, 0.15]) ax2 = fig.add_axes([0.05, 0.475, 0.9, 0.15]) ax3 = fig.add_axes([0.05, 0.15, 0.9, 0.15]) # Set the colormap and norm to correspond to the data for which # the colorbar will be used. cmap = mpl.cm.cool norm = mpl.colors.Normalize(vmin=5, vmax=10) # ColorbarBase derives from ScalarMappable and puts a colorbar # in a specified axes, so it has everything needed for a # standalone colorbar. There are many more kwargs, but the # following gives a basic continuous colorbar with ticks # and labels. cb1 = mpl.colorbar.ColorbarBase(ax1, cmap=cmap, norm=norm, orientation='horizontal') cb1.set_label('Some Units') # The second example illustrates the use of a ListedColormap, a # BoundaryNorm, and extended ends to show the "over" and "under" # value colors. cmap = mpl.colors.ListedColormap(['r', 'g', 'b', 'c']) cmap.set_over('0.25') cmap.set_under('0.75') # If a ListedColormap is used, the length of the bounds array must be # one greater than the length of the color list. The bounds must be # monotonically increasing. bounds = [1, 2, 4, 7, 8] norm = mpl.colors.BoundaryNorm(bounds, cmap.N) cb2 = mpl.colorbar.ColorbarBase(ax2, cmap=cmap, norm=norm, # to use 'extend', you must # specify two extra boundaries: boundaries=[0] + bounds + [13], extend='both', ticks=bounds, # optional spacing='proportional', orientation='horizontal') cb2.set_label('Discrete intervals, some other units') # The third example illustrates the use of custom length colorbar # extensions, used on a colorbar with discrete intervals. cmap = mpl.colors.ListedColormap([[0., .4, 1.], [0., .8, 1.], [1., .8, 0.], [1., .4, 0.]]) cmap.set_over((1., 0., 0.)) cmap.set_under((0., 0., 1.)) bounds = [-1., -.5, 0., .5, 1.] norm = mpl.colors.BoundaryNorm(bounds, cmap.N) cb3 = mpl.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, boundaries=[-10] + bounds + [10], extend='both', # Make the length of each extension # the same as the length of the # interior colors: extendfrac='auto', ticks=bounds, spacing='uniform', orientation='horizontal') cb3.set_label('Custom extension lengths, some other units') plt.show() ``` ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: astroshim <[email protected]> Author: Lee moon soo <[email protected]> Author: HyungSung <[email protected]> Closes #2106 from astroshim/py4jPythonInterpreter and squashes the following commits: c9b195b [HyungSung] Merge pull request #16 from Leemoonsoo/py4jdocker e511ebe [Lee moon soo] add PythonDockerInterpreter to interpreter-setting.json a76b0d8 [Lee moon soo] fix test on python3 2eb5de7 [Lee moon soo] Fix PythonDockerInterpreterTest.java test 9fcf144 [Lee moon soo] Make python docker interpreter work using py4j 8a016c9 [astroshim] Merge branch 'master' into py4jPythonInterpreter aad7ee8 [astroshim] fix testcase ac92cdb [astroshim] fix python interpreter testcase e8570d2 [astroshim] fix ci for pandassql be5db4d [astroshim] fix pandas sql testcase f8e19be [astroshim] fix matplotlib testcase 046db88 [astroshim] add testcase e49ad24 [astroshim] add pandas 60e9820 [astroshim] bug fix about copying library 574bd21 [astroshim] fix interpreter-setting error a48df58 [astroshim] Merge branch 'master' into py4jPythonInterpreter 3c9585f [astroshim] update interpreter-setting.json a50179e [astroshim] add conda interpreter cbbc15c [astroshim] fix py4j path 5ae5120 [astroshim] fix interpreter-setting f17bff4 [astroshim] fix testcase failure. af097ac [astroshim] add testcase c3f5b78 [astroshim] Merge branch 'master' of https://github.com/apache/zeppelin into py4jPythonInterpreter 1395875 [astroshim] removed unnecessary code. 276011e [astroshim] add py4j lib 7304919 [astroshim] initialize python interpreter using py4j (cherry picked from commit 287ffd50e2f061d5fdbe42e37c8857a79420fa80) Signed-off-by: Jongyoul Lee <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/89cd5f86 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/89cd5f86 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/89cd5f86 Branch: refs/heads/branch-0.7 Commit: 89cd5f86cfc318bccfee45b0b18ed463ef4ccc81 Parents: e5b19de Author: astroshim <[email protected]> Authored: Sat Mar 18 18:24:16 2017 +0900 Committer: Jongyoul Lee <[email protected]> Committed: Sat Mar 18 19:50:57 2017 +0900 ---------------------------------------------------------------------- python/pom.xml | 37 ++ .../python/PythonDockerInterpreter.java | 37 +- .../zeppelin/python/PythonInterpreter.java | 465 ++++++++++++++----- .../python/PythonInterpreterPandasSql.java | 17 +- .../apache/zeppelin/python/PythonProcess.java | 138 ------ python/src/main/resources/__init__.py | 14 - python/src/main/resources/bootstrap.py | 234 ---------- python/src/main/resources/bootstrap_input.py | 58 --- python/src/main/resources/bootstrap_sql.py | 28 -- .../src/main/resources/interpreter-setting.json | 2 +- .../src/main/resources/python/bootstrap_sql.py | 29 ++ .../main/resources/python/zeppelin_python.py | 276 +++++++++++ .../python/PythonCondaInterpreterTest.java | 34 +- .../python/PythonDockerInterpreterTest.java | 9 +- .../python/PythonInterpreterMatplotlibTest.java | 146 +++--- .../python/PythonInterpreterPandasSqlTest.java | 94 ++-- .../zeppelin/python/PythonInterpreterTest.java | 274 +++-------- ...ythonInterpreterWithPythonInstalledTest.java | 125 ----- 18 files changed, 950 insertions(+), 1067 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/pom.xml ---------------------------------------------------------------------- diff --git a/python/pom.xml b/python/pom.xml index cd481c1..ff1a361 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -99,6 +99,43 @@ </plugin> <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>wagon-maven-plugin</artifactId> + <version>1.0</version> + <executions> + <execution> + <phase>package</phase> + <goals><goal>download-single</goal></goals> + <configuration> + <url>https://pypi.python.org/packages/64/5c/01e13b68e8caafece40d549f232c9b5677ad1016071a48d04cc3895acaa3</url> + <fromFile>py4j-${py4j.version}.zip</fromFile> + <toFile>${project.build.directory}/../../interpreter/python/py4j-${py4j.version}.zip</toFile> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <phase>package</phase> + <configuration> + <target> + <unzip src="${project.build.directory}/../../interpreter/python/py4j-${py4j.version}.zip" + dest="${project.build.directory}/../../interpreter/python"/> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + + + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java index c41a3cc..582debd 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java @@ -21,10 +21,8 @@ import org.apache.zeppelin.scheduler.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; +import java.nio.file.Paths; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -37,6 +35,7 @@ public class PythonDockerInterpreter extends Interpreter { Pattern activatePattern = Pattern.compile("activate\\s*(.*)"); Pattern deactivatePattern = Pattern.compile("deactivate"); Pattern helpPattern = Pattern.compile("help"); + private File zeppelinHome; public PythonDockerInterpreter(Properties property) { super(property); @@ -44,7 +43,11 @@ public class PythonDockerInterpreter extends Interpreter { @Override public void open() { - + if (System.getenv("ZEPPELIN_HOME") != null) { + zeppelinHome = new File(System.getenv("ZEPPELIN_HOME")); + } else { + zeppelinHome = Paths.get("..").toAbsolutePath().toFile(); + } } @Override @@ -54,6 +57,7 @@ public class PythonDockerInterpreter extends Interpreter { @Override public InterpreterResult interpret(String st, InterpreterContext context) { + File pythonScript = new File(getPythonInterpreter().getScriptPath()); InterpreterOutput out = context.out; Matcher activateMatcher = activatePattern.matcher(st); @@ -66,7 +70,27 @@ public class PythonDockerInterpreter extends Interpreter { } else if (activateMatcher.matches()) { String image = activateMatcher.group(1); pull(out, image); - setPythonCommand("docker run -i --rm " + image + " python -iu"); + + // mount pythonscript dir + String mountPythonScript = "-v " + + pythonScript.getParentFile().getAbsolutePath() + + ":/_zeppelin_tmp "; + + // mount zeppelin dir + String mountPy4j = "-v " + + zeppelinHome.getAbsolutePath() + + ":/_zeppelin "; + + // set PYTHONPATH + String pythonPath = ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PY4JPATH + ":" + + ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PYTHON_LIBS; + + setPythonCommand("docker run -i --rm " + + mountPythonScript + + mountPy4j + + "-e PYTHONPATH=\"" + pythonPath + "\" " + + image + + " python /_zeppelin_tmp/" + pythonScript.getName()); restartPythonProcess(); out.clear(); return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated"); @@ -79,6 +103,7 @@ public class PythonDockerInterpreter extends Interpreter { } } + public void setPythonCommand(String cmd) { PythonInterpreter python = getPythonInterpreter(); python.setPythonCommand(cmd); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index d77b59a..f825568 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -18,20 +18,38 @@ package org.apache.zeppelin.python; import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.net.ServerSocket; +import java.io.OutputStreamWriter; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.net.*; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Properties; -import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.DefaultExecutor; +import org.apache.commons.exec.ExecuteException; +import org.apache.commons.exec.ExecuteResultHandler; +import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.exec.PumpStreamHandler; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.commons.io.IOUtils; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -39,144 +57,360 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import py4j.GatewayServer; +import py4j.commands.Command; /** * Python interpreter for Zeppelin. */ -public class PythonInterpreter extends Interpreter { +public class PythonInterpreter extends Interpreter implements ExecuteResultHandler { private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class); - - public static final String BOOTSTRAP_PY = "/bootstrap.py"; - public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py"; - public static final String ZEPPELIN_PYTHON = "zeppelin.python"; + public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py"; + public static final String ZEPPELIN_PY4JPATH = "interpreter/python/py4j-0.9.2/src"; + public static final String ZEPPELIN_PYTHON_LIBS = "interpreter/lib/python"; public static final String DEFAULT_ZEPPELIN_PYTHON = "python"; public static final String MAX_RESULT = "zeppelin.python.maxResult"; - private Integer port; - private GatewayServer gatewayServer; - private Boolean py4JisInstalled = false; private InterpreterContext context; private Pattern errorInLastLine = Pattern.compile(".*(Error|Exception): .*$"); private String pythonPath; private int maxResult; + private String py4jLibPath; + private String pythonLibPath; + + private String pythonCommand; + + private GatewayServer gatewayServer; + private DefaultExecutor executor; + private int port; + private InterpreterOutputStream outputStream; + private BufferedWriter ins; + private PipedInputStream in; + private ByteArrayOutputStream input; + private String scriptPath; + boolean pythonscriptRunning = false; + private static final int MAX_TIMEOUT_SEC = 10; - PythonProcess process = null; - private String pythonCommand = null; + private long pythonPid = 0; + + Integer statementSetNotifier = new Integer(0); public PythonInterpreter(Properties property) { super(property); + try { + File scriptFile = File.createTempFile("zeppelin_python-", ".py", new File("/tmp")); + scriptPath = scriptFile.getAbsolutePath(); + } catch (IOException e) { + throw new InterpreterException(e); + } } - @Override - public void open() { - // Add matplotlib display hook - InterpreterGroup intpGroup = getInterpreterGroup(); - if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { - registerHook(HookType.POST_EXEC_DEV, "\nz._displayhook()"); + private String workingDir() { + URL myURL = getClass().getProtectionDomain().getCodeSource().getLocation(); + java.net.URI myURI = null; + try { + myURI = myURL.toURI(); + } catch (URISyntaxException e1) + {} + String path = java.nio.file.Paths.get(myURI).toFile().toString(); + return path; + } + + private void createPythonScript() { + File out = new File(scriptPath); + + if (out.exists() && out.isDirectory()) { + throw new InterpreterException("Can't create python script " + out.getAbsolutePath()); } - // Add zeppelin-bundled libs to PYTHONPATH - setPythonPath("../interpreter/lib/python:$PYTHONPATH"); - LOG.info("Starting Python interpreter ---->"); - LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON)); + copyFile(out, ZEPPELIN_PYTHON); + logger.info("File {} created", scriptPath); + } - maxResult = Integer.valueOf(getProperty(MAX_RESULT)); - process = getPythonProcess(); + public String getScriptPath() { + return scriptPath; + } + private void copyFile(File out, String sourceFile) { + ClassLoader classLoader = getClass().getClassLoader(); try { - process.open(); + FileOutputStream outStream = new FileOutputStream(out); + IOUtils.copy( + classLoader.getResourceAsStream(sourceFile), + outStream); + outStream.close(); } catch (IOException e) { - LOG.error("Can't start the python process", e); + throw new InterpreterException(e); + } + } + + private void createGatewayServerAndStartScript() throws UnknownHostException { + createPythonScript(); + if (System.getenv("ZEPPELIN_HOME") != null) { + py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PY4JPATH; + pythonLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PYTHON_LIBS; + } else { + Path workingPath = Paths.get("..").toAbsolutePath(); + py4jLibPath = workingPath + File.separator + ZEPPELIN_PY4JPATH; + pythonLibPath = workingPath + File.separator + ZEPPELIN_PYTHON_LIBS; + } + + port = findRandomOpenPortOnAllLocalInterfaces(); + gatewayServer = new GatewayServer(this, + port, + GatewayServer.DEFAULT_PYTHON_PORT, + InetAddress.getByName("0.0.0.0"), + InetAddress.getByName("0.0.0.0"), + GatewayServer.DEFAULT_CONNECT_TIMEOUT, + GatewayServer.DEFAULT_READ_TIMEOUT, + (List) null); + + gatewayServer.start(); + + // Run python shell + String pythonCmd = getPythonCommand(); + CommandLine cmd = CommandLine.parse(pythonCmd); + + if (!pythonCmd.endsWith(".py")) { + // PythonDockerInterpreter set pythoncmd with script + cmd.addArgument(getScriptPath(), false); } + cmd.addArgument(Integer.toString(port), false); + cmd.addArgument(getLocalIp(), false); + executor = new DefaultExecutor(); + outputStream = new InterpreterOutputStream(logger); + PipedOutputStream ps = new PipedOutputStream(); + in = null; try { - LOG.info("python PID : " + process.getPid()); - } catch (Exception e) { - LOG.warn("Can't find python pid process", e); + in = new PipedInputStream(ps); + } catch (IOException e1) { + throw new InterpreterException(e1); } + ins = new BufferedWriter(new OutputStreamWriter(ps)); + input = new ByteArrayOutputStream(); + + PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in); + executor.setStreamHandler(streamHandler); + executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); try { - LOG.info("Bootstrap interpreter with " + BOOTSTRAP_PY); - bootStrapInterpreter(BOOTSTRAP_PY); + Map env = EnvironmentUtils.getProcEnvironment(); + if (!env.containsKey("PYTHONPATH")) { + env.put("PYTHONPATH", py4jLibPath + File.pathSeparator + pythonLibPath); + } else { + env.put("PYTHONPATH", env.get("PYTHONPATH") + File.pathSeparator + + py4jLibPath + File.pathSeparator + pythonLibPath); + } + + logger.info("cmd = {}", cmd.toString()); + executor.execute(cmd, env, this); + pythonscriptRunning = true; } catch (IOException e) { - LOG.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e); + throw new InterpreterException(e); } - py4JisInstalled = isPy4jInstalled(); - if (py4JisInstalled) { - port = findRandomOpenPortOnAllLocalInterfaces(); - LOG.info("Py4j gateway port : " + port); - try { - gatewayServer = new GatewayServer(this, port); - gatewayServer.start(); - LOG.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY); - bootStrapInterpreter(BOOTSTRAP_INPUT_PY); - } catch (IOException e) { - LOG.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " + - "initialize Zeppelin inputs in python process", e); - } + try { + input.write("import sys, getopt\n".getBytes()); + ins.flush(); + } catch (IOException e) { + throw new InterpreterException(e); + } + } + + @Override + public void open() { + // Add matplotlib display hook + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { + registerHook(HookType.POST_EXEC_DEV, "z._displayhook()"); + } + // Add matplotlib display hook + try { + createGatewayServerAndStartScript(); + } catch (UnknownHostException e) { + throw new InterpreterException(e); } } @Override public void close() { - LOG.info("closing Python interpreter <----"); + pythonscriptRunning = false; + pythonScriptInitialized = false; + try { - if (process != null) { - process.close(); - process = null; + ins.flush(); + ins.close(); + input.flush(); + input.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + executor.getWatchdog().destroyProcess(); + new File(scriptPath).delete(); + gatewayServer.shutdown(); + + // wait until getStatements stop + synchronized (statementSetNotifier) { + try { + statementSetNotifier.wait(1500); + } catch (InterruptedException e) { } - if (gatewayServer != null) { - gatewayServer.shutdown(); + statementSetNotifier.notify(); + } + } + + PythonInterpretRequest pythonInterpretRequest = null; + /** + * Result class of python interpreter + */ + public class PythonInterpretRequest { + public String statements; + + public PythonInterpretRequest(String statements) { + this.statements = statements; + } + + public String statements() { + return statements; + } + } + + public PythonInterpretRequest getStatements() { + synchronized (statementSetNotifier) { + while (pythonInterpretRequest == null && pythonscriptRunning && pythonScriptInitialized) { + try { + statementSetNotifier.wait(1000); + } catch (InterruptedException e) { + } } - } catch (IOException e) { - LOG.error("Can't close the interpreter", e); + PythonInterpretRequest req = pythonInterpretRequest; + pythonInterpretRequest = null; + return req; + } + } + + String statementOutput = null; + boolean statementError = false; + Integer statementFinishedNotifier = new Integer(0); + + public void setStatementsFinished(String out, boolean error) { + synchronized (statementFinishedNotifier) { + statementOutput = out; + statementError = error; + statementFinishedNotifier.notify(); + } + } + + boolean pythonScriptInitialized = false; + Integer pythonScriptInitializeNotifier = new Integer(0); + + public void onPythonScriptInitialized(long pid) { + pythonPid = pid; + synchronized (pythonScriptInitializeNotifier) { + pythonScriptInitialized = true; + pythonScriptInitializeNotifier.notifyAll(); } } + public void appendOutput(String message) throws IOException { + outputStream.getInterpreterOutput().write(message); + } + @Override public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { if (cmd == null || cmd.isEmpty()) { return new InterpreterResult(Code.SUCCESS, ""); } + this.context = contextInterpreter; - String output = sendCommandToPython(cmd); - InterpreterResult result; - if (pythonErrorIn(output)) { - result = new InterpreterResult(Code.ERROR, output.replaceAll("\\.\\.\\.", "")); + if (!pythonscriptRunning) { + return new InterpreterResult(Code.ERROR, "python process not running" + + outputStream.toString()); + } + + outputStream.setInterpreterOutput(context.out); + + synchronized (pythonScriptInitializeNotifier) { + long startTime = System.currentTimeMillis(); + while (pythonScriptInitialized == false + && pythonscriptRunning + && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) { + try { + pythonScriptInitializeNotifier.wait(1000); + } catch (InterruptedException e) { + } + } + } + + List<InterpreterResultMessage> errorMessage; + try { + context.out.flush(); + errorMessage = context.out.toInterpreterResultMessage(); + } catch (IOException e) { + throw new InterpreterException(e); + } + + if (pythonscriptRunning == false) { + // python script failed to initialize and terminated + errorMessage.add(new InterpreterResultMessage( + InterpreterResult.Type.TEXT, "failed to start python")); + return new InterpreterResult(Code.ERROR, errorMessage); + } + if (pythonScriptInitialized == false) { + // timeout. didn't get initialized message + errorMessage.add(new InterpreterResultMessage( + InterpreterResult.Type.TEXT, "python is not responding")); + return new InterpreterResult(Code.ERROR, errorMessage); + } + + pythonInterpretRequest = new PythonInterpretRequest(cmd); + statementOutput = null; + + synchronized (statementSetNotifier) { + statementSetNotifier.notify(); + } + + synchronized (statementFinishedNotifier) { + while (statementOutput == null) { + try { + statementFinishedNotifier.wait(1000); + } catch (InterruptedException e) { + } + } + } + + if (statementError) { + return new InterpreterResult(Code.ERROR, statementOutput); } else { - result = new InterpreterResult(Code.SUCCESS, output); + + try { + context.out.flush(); + } catch (IOException e) { + throw new InterpreterException(e); + } + + return new InterpreterResult(Code.SUCCESS); } - return result; } - /** - * Checks if there is a syntax error or an exception - * - * @param output Python interpreter output - * @return true if syntax error or exception has happened - */ - private boolean pythonErrorIn(String output) { - boolean isError = false; - String[] outputMultiline = output.split("\n"); - Matcher errorMatcher; - for (String row : outputMultiline) { - errorMatcher = errorInLastLine.matcher(row); - if (errorMatcher.find() == true) { - isError = true; - break; - } + public void interrupt() throws IOException { + if (pythonPid > -1) { + logger.info("Sending SIGINT signal to PID : " + pythonPid); + Runtime.getRuntime().exec("kill -SIGINT " + pythonPid); + } else { + logger.warn("Non UNIX/Linux system, close the interpreter"); + close(); } - return isError; } @Override public void cancel(InterpreterContext context) { try { - process.interrupt(); + interrupt(); } catch (IOException e) { - LOG.error("Can't interrupt the python interpreter", e); + e.printStackTrace(); } } @@ -201,28 +435,17 @@ public class PythonInterpreter extends Interpreter { return null; } - public void setPythonPath(String pythonPath) { - this.pythonPath = pythonPath; - } - - public PythonProcess getPythonProcess() { - if (process == null) { - String binPath = getProperty(ZEPPELIN_PYTHON); - if (pythonCommand != null) { - binPath = pythonCommand; - } - return new PythonProcess(binPath, pythonPath); - } else { - return process; - } - } - public void setPythonCommand(String cmd) { + logger.info("Set Python Command : {}", cmd); pythonCommand = cmd; } public String getPythonCommand() { - return pythonCommand; + if (pythonCommand == null) { + return DEFAULT_ZEPPELIN_PYTHON; + } else { + return pythonCommand; + } } private Job getRunningJob(String paragraphId) { @@ -237,24 +460,6 @@ public class PythonInterpreter extends Interpreter { return foundJob; } - - /** - * Sends given text to Python interpreter, blocks and returns the output - * @param cmd Python expression text - * @return output - */ - String sendCommandToPython(String cmd) { - String output = ""; - LOG.debug("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd)); - try { - output = process.sendAndGetResult(cmd); - } catch (IOException e) { - LOG.error("Error when sending commands to python process", e); - } - LOG.debug("Got : \n" + output); - return output; - } - void bootStrapInterpreter(String file) throws IOException { BufferedReader bootstrapReader = new BufferedReader( new InputStreamReader( @@ -265,24 +470,22 @@ public class PythonInterpreter extends Interpreter { while ((line = bootstrapReader.readLine()) != null) { bootstrapCode += line + "\n"; } - if (py4JisInstalled && port != null && port != -1) { - bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString()); - } - LOG.info("Bootstrap python interpreter with code from \n " + file); - sendCommandToPython(bootstrapCode); + + interpret(bootstrapCode, context); } public GUI getGui() { return context.getGui(); } - public Integer getPy4jPort() { - return port; - } - - public Boolean isPy4jInstalled() { - String output = sendCommandToPython("\n\nimport py4j\n"); - return !output.contains("ImportError"); + String getLocalIp() { + try { + return Inet4Address.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + logger.error("can't get local IP", e); + } + // fall back to loopback addreess + return "127.0.0.1"; } private int findRandomOpenPortOnAllLocalInterfaces() { @@ -299,4 +502,16 @@ public class PythonInterpreter extends Interpreter { public int getMaxResult() { return maxResult; } + + @Override + public void onProcessComplete(int exitValue) { + pythonscriptRunning = false; + logger.info("python process terminated. exit code " + exitValue); + } + + @Override + public void onProcessFailed(ExecuteException e) { + pythonscriptRunning = false; + logger.error("python process failed", e); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java index 381066f..6bf1970 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java @@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory; public class PythonInterpreterPandasSql extends Interpreter { private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class); - private String SQL_BOOTSTRAP_FILE_PY = "/bootstrap_sql.py"; + private String SQL_BOOTSTRAP_FILE_PY = "/python/bootstrap_sql.py"; public PythonInterpreterPandasSql(Properties property) { super(property); @@ -64,25 +64,17 @@ public class PythonInterpreterPandasSql extends Interpreter { @Override public void open() { LOG.info("Open Python SQL interpreter instance: {}", this.toString()); + try { LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY); PythonInterpreter python = getPythonInterpreter(); + python.bootStrapInterpreter(SQL_BOOTSTRAP_FILE_PY); } catch (IOException e) { LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e); } } - /** - * Checks if Python dependencies pandas and pandasql are installed - * @return True if they are - */ - boolean isPandasAndPandasqlInstalled() { - PythonInterpreter python = getPythonInterpreter(); - String output = python.sendCommandToPython("\n\nimport pandas\nimport pandasql\n"); - return !output.contains("ImportError"); - } - @Override public void close() { LOG.info("Close Python SQL interpreter instance: {}", this.toString()); @@ -94,7 +86,8 @@ public class PythonInterpreterPandasSql extends Interpreter { public InterpreterResult interpret(String st, InterpreterContext context) { LOG.info("Running SQL query: '{}' over Pandas DataFrame", st); Interpreter python = getPythonInterpreter(); - return python.interpret("z.show(pysqldf('" + st + "'))", context); + + return python.interpret("z.show(pysqldf('" + st + "'))\nz._displayhook()", context); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java b/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java deleted file mode 100644 index 578ffeb..0000000 --- a/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java +++ /dev/null @@ -1,138 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.zeppelin.python; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.io.OutputStream; -import java.lang.reflect.Field; - -/** - * Object encapsulated interactive - * Python process (REPL) used by python interpreter - */ -public class PythonProcess { - private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class); - private static final String STATEMENT_END = "*!?flush reader!?*"; - InputStream stdout; - OutputStream stdin; - PrintWriter writer; - BufferedReader reader; - Process process; - - private String binPath; - private String pythonPath; - private long pid; - - public PythonProcess(String binPath, String pythonPath) { - this.binPath = binPath; - this.pythonPath = pythonPath; - } - - public void open() throws IOException { - ProcessBuilder builder; - boolean hasParams = binPath.split(" ").length > 1; - if (System.getProperty("os.name").toLowerCase().contains("windows")) { - if (hasParams) { - builder = new ProcessBuilder(binPath.split(" ")); - } else { - builder = new ProcessBuilder(binPath, "-iu"); - } - } else { - String cmd; - if (hasParams) { - cmd = binPath; - } else { - cmd = binPath + " -iu"; - } - builder = new ProcessBuilder("bash", "-c", cmd); - if (pythonPath != null) { - builder.environment().put("PYTHONPATH", pythonPath); - } - } - - builder.redirectErrorStream(true); - process = builder.start(); - stdout = process.getInputStream(); - stdin = process.getOutputStream(); - writer = new PrintWriter(stdin, true); - reader = new BufferedReader(new InputStreamReader(stdout)); - try { - pid = findPid(); - } catch (Exception e) { - logger.warn("Can't find python pid process", e); - pid = -1; - } - } - - public void close() throws IOException { - process.destroy(); - reader.close(); - writer.close(); - stdin.close(); - stdout.close(); - } - - public void interrupt() throws IOException { - if (pid > -1) { - logger.info("Sending SIGINT signal to PID : " + pid); - Runtime.getRuntime().exec("kill -SIGINT " + pid); - } else { - logger.warn("Non UNIX/Linux system, close the interpreter"); - close(); - } - } - - public String sendAndGetResult(String cmd) throws IOException { - writer.println(cmd); - writer.println(); - writer.println("\"" + STATEMENT_END + "\""); - StringBuilder output = new StringBuilder(); - String line = null; - - while ((line = reader.readLine()) != null && - !line.contains(STATEMENT_END)) { - logger.debug("Read line from python shell : " + line); - output.append(line + "\n"); - } - - return output.toString(); - } - - private long findPid() throws NoSuchFieldException, IllegalAccessException { - long pid = -1; - if (process.getClass().getName().equals("java.lang.UNIXProcess")) { - Field f = process.getClass().getDeclaredField("pid"); - f.setAccessible(true); - pid = f.getLong(process); - f.setAccessible(false); - } - return pid; - } - - public long getPid() { - return pid; - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/resources/__init__.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/__init__.py b/python/src/main/resources/__init__.py deleted file mode 100644 index ec20143..0000000 --- a/python/src/main/resources/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/resources/bootstrap.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/bootstrap.py b/python/src/main/resources/bootstrap.py deleted file mode 100644 index 0a20a34..0000000 --- a/python/src/main/resources/bootstrap.py +++ /dev/null @@ -1,234 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# PYTHON 2 / 3 compatibility : -# bootstrap.py must be runnable with Python 2 or 3 - -import os -import sys -import signal -import base64 -import warnings -from io import BytesIO -try: - from StringIO import StringIO -except ImportError: - from io import StringIO - -def intHandler(signum, frame): # Set the signal handler - print ("Paragraph interrupted") - raise KeyboardInterrupt() - -signal.signal(signal.SIGINT, intHandler) -# set prompt as empty string so that java side don't need to remove the prompt. -sys.ps1="" - -def help(): - print("""%html - <h2>Python Interpreter help</h2> - - <h3>Python 2 & 3 compatibility</h3> - <p>The interpreter is compatible with Python 2 & 3.<br/> - To change Python version, - change in the interpreter configuration the python to the - desired version (example : python=/usr/bin/python3)</p> - - <h3>Python modules</h3> - <p>The interpreter can use all modules already installed - (with pip, easy_install, etc)</p> - - <h3>Forms</h3> - You must install py4j in order to use - the form feature (pip install py4j) - <h4>Input form</h4> - <pre>print (z.input("f1","defaultValue"))</pre> - <h4>Selection form</h4> - <pre>print(z.select("f2", [("o1","1"), ("o2","2")],2))</pre> - <h4>Checkbox form</h4> - <pre> print("".join(z.checkbox("f3", [("o1","1"), ("o2","2")],["1"])))</pre>') - - <h3>Matplotlib graph</h3> - <div>The interpreter can display matplotlib graph with - the function z.show()</div> - <div> You need to already have matplotlib module installed - to use this functionality !</div><br/> - <pre>import matplotlib.pyplot as plt - plt.figure() - (.. ..) - z.show(plt) - plt.close() - </pre> - <div><br/> z.show function can take optional parameters - to adapt graph dimensions (width and height) and format (png or svg)</div> - <div><b>example </b>: - <pre>z.show(plt,width='50px - z.show(plt,height='150px', fmt='svg') </pre></div> - - <h3>Pandas DataFrame</h3> - <div> You need to have Pandas module installed - to use this functionality (pip install pandas) !</div><br/> - <div>The interpreter can visualize Pandas DataFrame - with the function z.show() - <pre> - import pandas as pd - df = pd.read_csv("bank.csv", sep=";") - z.show(df) - </pre></div> - - <h3>SQL over Pandas DataFrame</h3> - <div> You need to have Pandas&Pandasql modules installed - to use this functionality (pip install pandas pandasql) !</div><br/> - - <div>Python interpreter group includes %sql interpreter that can query - Pandas DataFrames using SQL and visualize results using Zeppelin Table Display System - - <pre> - %python - import pandas as pd - df = pd.read_csv("bank.csv", sep=";") - </pre> - <br /> - <pre> - %python.sql - %sql - SELECT * from df LIMIT 5 - </pre> - </div> - """) - - -class PyZeppelinContext(object): - """ If py4j is detected, these class will be override - with the implementation in bootstrap_input.py - """ - errorMsg = "You must install py4j Python module " \ - "(pip install py4j) to use Zeppelin dynamic forms features" - - def __init__(self): - self.max_result = 1000 - self._displayhook = lambda *args: None - self._setup_matplotlib() - - def input(self, name, defaultValue=""): - print(self.errorMsg) - - def select(self, name, options, defaultValue=""): - print(self.errorMsg) - - def checkbox(self, name, options, defaultChecked=[]): - print(self.errorMsg) - - def show(self, p, **kwargs): - if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot": - self.show_matplotlib(p, **kwargs) - elif type(p).__name__ == "DataFrame": # does not play well with sub-classes - # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` - # and so a dependency on pandas - self.show_dataframe(p, **kwargs) - elif hasattr(p, '__call__'): - p() #error reporting - - def show_dataframe(self, df, show_index=False, **kwargs): - """Pretty prints DF using Table Display System - """ - limit = len(df) > self.max_result - header_buf = StringIO("") - if show_index: - idx_name = str(df.index.name) if df.index.name is not None else "" - header_buf.write(idx_name + "\t") - header_buf.write(str(df.columns[0])) - for col in df.columns[1:]: - header_buf.write("\t") - header_buf.write(str(col)) - header_buf.write("\n") - - body_buf = StringIO("") - rows = df.head(self.max_result).values if limit else df.values - index = df.index.values - for idx, row in zip(index, rows): - if show_index: - body_buf.write("%html <strong>{}</strong>".format(idx)) - body_buf.write("\t") - body_buf.write(str(row[0])) - for cell in row[1:]: - body_buf.write("\t") - body_buf.write(str(cell)) - body_buf.write("\n") - body_buf.seek(0); header_buf.seek(0) - #TODO(bzz): fix it, so it shows red notice, as in Spark - print("%table " + header_buf.read() + body_buf.read()) # + - # ("\n<font color=red>Results are limited by {}.</font>" \ - # .format(self.max_result) if limit else "") - #) - body_buf.close(); header_buf.close() - - def show_matplotlib(self, p, fmt="png", width="auto", height="auto", - **kwargs): - """Matplotlib show function - """ - if fmt == "png": - img = BytesIO() - p.savefig(img, format=fmt) - img_str = b"data:image/png;base64," - img_str += base64.b64encode(img.getvalue().strip()) - img_tag = "<img src={img} style='width={width};height:{height}'>" - # Decoding is necessary for Python 3 compability - img_str = img_str.decode("ascii") - img_str = img_tag.format(img=img_str, width=width, height=height) - elif fmt == "svg": - img = StringIO() - p.savefig(img, format=fmt) - img_str = img.getvalue() - else: - raise ValueError("fmt must be 'png' or 'svg'") - - html = "%html <div style='width:{width};height:{height}'>{img}<div>" - print(html.format(width=width, height=height, img=img_str)) - img.close() - - def configure_mpl(self, **kwargs): - import mpl_config - mpl_config.configure(**kwargs) - - def _setup_matplotlib(self): - # If we don't have matplotlib installed don't bother continuing - try: - import matplotlib - except ImportError: - return - # Make sure custom backends are available in the PYTHONPATH - rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd()) - mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python') - if mpl_path not in sys.path: - sys.path.append(mpl_path) - - # Finally check if backend exists, and if so configure as appropriate - try: - matplotlib.use('module://backend_zinline') - import backend_zinline - - # Everything looks good so make config assuming that we are using - # an inline backend - self._displayhook = backend_zinline.displayhook - self.configure_mpl(width=600, height=400, dpi=72, - fontsize=10, interactive=True, format='png') - except ImportError: - # Fall back to Agg if no custom backend installed - matplotlib.use('Agg') - warnings.warn("Unable to load inline matplotlib backend, " - "falling back to Agg") - - -z = PyZeppelinContext() http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/resources/bootstrap_input.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/bootstrap_input.py b/python/src/main/resources/bootstrap_input.py deleted file mode 100644 index 6a0c544..0000000 --- a/python/src/main/resources/bootstrap_input.py +++ /dev/null @@ -1,58 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from py4j.java_gateway import JavaGateway -from py4j.java_gateway import java_import, JavaGateway, GatewayClient - - -client = GatewayClient(port=%PORT%) -gateway = JavaGateway(client) -java_import(gateway.jvm, "org.apache.zeppelin.display.Input") - - -class Py4jZeppelinContext(PyZeppelinContext): - """A context impl that uses Py4j to communicate to JVM - """ - def __init__(self, z): - PyZeppelinContext.__init__(self) - self.z = z - self.paramOption = gateway.jvm.org.apache.zeppelin.display.Input.ParamOption - self.javaList = gateway.jvm.java.util.ArrayList - self.max_result = self.z.getMaxResult() - - def input(self, name, defaultValue=""): - return self.z.getGui().input(name, defaultValue) - - def select(self, name, options, defaultValue=""): - javaOptions = gateway.new_array(self.paramOption, len(options)) - i = 0 - for tuple in options: - javaOptions[i] = self.paramOption(tuple[0], tuple[1]) - i += 1 - return self.z.getGui().select(name, defaultValue, javaOptions) - - def checkbox(self, name, options, defaultChecked=[]): - javaOptions = gateway.new_array(self.paramOption, len(options)) - i = 0 - for tuple in options: - javaOptions[i] = self.paramOption(tuple[0], tuple[1]) - i += 1 - javaDefaultCheck = self.javaList() - for check in defaultChecked: - javaDefaultCheck.append(check) - return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions) - - -z = Py4jZeppelinContext(gateway.entry_point) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/resources/bootstrap_sql.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/bootstrap_sql.py b/python/src/main/resources/bootstrap_sql.py deleted file mode 100644 index d8248c9..0000000 --- a/python/src/main/resources/bootstrap_sql.py +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Setup SQL over Pandas DataFrames -# It requires next dependencies to be installed: -# - pandas -# - pandasql - -from __future__ import print_function - -try: - from pandasql import sqldf - pysqldf = lambda q: sqldf(q, globals()) -except ImportError: - pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" + - "Make sure 'pandas' and 'pandasql' libraries are installed") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/python/src/main/resources/interpreter-setting.json b/python/src/main/resources/interpreter-setting.json index af0ba89..bc4d4ec 100644 --- a/python/src/main/resources/interpreter-setting.json +++ b/python/src/main/resources/interpreter-setting.json @@ -39,7 +39,7 @@ "className": "org.apache.zeppelin.python.PythonCondaInterpreter", "properties": { }, - "editor":{ + "editor": { "language": "sh", "editOnDblClick": false } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/resources/python/bootstrap_sql.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/bootstrap_sql.py b/python/src/main/resources/python/bootstrap_sql.py new file mode 100644 index 0000000..6f1ae81 --- /dev/null +++ b/python/src/main/resources/python/bootstrap_sql.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Setup SQL over Pandas DataFrames +# It requires next dependencies to be installed: +# - pandas +# - pandasql + +from __future__ import print_function + +try: + from pandasql import sqldf + pysqldf = lambda q: sqldf(q, globals()) +except ImportError: + pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" + + "Make sure 'pandas' and 'pandasql' libraries are installed") + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/main/resources/python/zeppelin_python.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py new file mode 100644 index 0000000..0a36cba --- /dev/null +++ b/python/src/main/resources/python/zeppelin_python.py @@ -0,0 +1,276 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os, sys, getopt, traceback, json, re + +from py4j.java_gateway import java_import, JavaGateway, GatewayClient +from py4j.protocol import Py4JJavaError, Py4JNetworkError +import warnings +import ast +import traceback +import warnings +import signal + +from io import BytesIO +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +# for back compatibility + +class Logger(object): + def __init__(self): + pass + + def write(self, message): + intp.appendOutput(message) + + def reset(self): + pass + + def flush(self): + pass + + +class PyZeppelinContext(object): + """ If py4j is detected, these class will be override + with the implementation in bootstrap_input.py + """ + errorMsg = "You must install py4j Python module " \ + "(pip install py4j) to use Zeppelin dynamic forms features" + + def __init__(self): + self.max_result = 1000 + self._displayhook = lambda *args: None + self._setup_matplotlib() + + def input(self, name, defaultValue=""): + print(self.errorMsg) + + def select(self, name, options, defaultValue=""): + print(self.errorMsg) + + def checkbox(self, name, options, defaultChecked=[]): + print(self.errorMsg) + + def show(self, p, **kwargs): + if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot": + self.show_matplotlib(p, **kwargs) + elif type(p).__name__ == "DataFrame": # does not play well with sub-classes + # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` + # and so a dependency on pandas + self.show_dataframe(p, **kwargs) + elif hasattr(p, '__call__'): + p() #error reporting + + def show_dataframe(self, df, show_index=False, **kwargs): + """Pretty prints DF using Table Display System + """ + limit = len(df) > self.max_result + header_buf = StringIO("") + if show_index: + idx_name = str(df.index.name) if df.index.name is not None else "" + header_buf.write(idx_name + "\t") + header_buf.write(str(df.columns[0])) + for col in df.columns[1:]: + header_buf.write("\t") + header_buf.write(str(col)) + header_buf.write("\n") + + body_buf = StringIO("") + rows = df.head(self.max_result).values if limit else df.values + index = df.index.values + for idx, row in zip(index, rows): + if show_index: + body_buf.write("%html <strong>{}</strong>".format(idx)) + body_buf.write("\t") + body_buf.write(str(row[0])) + for cell in row[1:]: + body_buf.write("\t") + body_buf.write(str(cell)) + body_buf.write("\n") + body_buf.seek(0); header_buf.seek(0) + #TODO(bzz): fix it, so it shows red notice, as in Spark + print("%table " + header_buf.read() + body_buf.read()) # + + # ("\n<font color=red>Results are limited by {}.</font>" \ + # .format(self.max_result) if limit else "") + #) + body_buf.close(); header_buf.close() + + def show_matplotlib(self, p, fmt="png", width="auto", height="auto", + **kwargs): + """Matplotlib show function + """ + if fmt == "png": + img = BytesIO() + p.savefig(img, format=fmt) + img_str = b"data:image/png;base64," + img_str += base64.b64encode(img.getvalue().strip()) + img_tag = "<img src={img} style='width={width};height:{height}'>" + # Decoding is necessary for Python 3 compability + img_str = img_str.decode("ascii") + img_str = img_tag.format(img=img_str, width=width, height=height) + elif fmt == "svg": + img = StringIO() + p.savefig(img, format=fmt) + img_str = img.getvalue() + else: + raise ValueError("fmt must be 'png' or 'svg'") + + html = "%html <div style='width:{width};height:{height}'>{img}<div>" + print(html.format(width=width, height=height, img=img_str)) + img.close() + + def configure_mpl(self, **kwargs): + import mpl_config + mpl_config.configure(**kwargs) + + def _setup_matplotlib(self): + # If we don't have matplotlib installed don't bother continuing + try: + import matplotlib + except ImportError: + return + # Make sure custom backends are available in the PYTHONPATH + rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd()) + mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python') + if mpl_path not in sys.path: + sys.path.append(mpl_path) + + # Finally check if backend exists, and if so configure as appropriate + try: + matplotlib.use('module://backend_zinline') + import backend_zinline + + # Everything looks good so make config assuming that we are using + # an inline backend + self._displayhook = backend_zinline.displayhook + self.configure_mpl(width=600, height=400, dpi=72, + fontsize=10, interactive=True, format='png') + except ImportError: + # Fall back to Agg if no custom backend installed + matplotlib.use('Agg') + warnings.warn("Unable to load inline matplotlib backend, " + "falling back to Agg") + + +def handler_stop_signals(sig, frame): + sys.exit("Got signal : " + str(sig)) + + +signal.signal(signal.SIGINT, handler_stop_signals) + +host = "127.0.0.1" +if len(sys.argv) >= 3: + host = sys.argv[2] + +client = GatewayClient(address=host, port=int(sys.argv[1])) + +#gateway = JavaGateway(client, auto_convert = True) +gateway = JavaGateway(client) + +intp = gateway.entry_point +intp.onPythonScriptInitialized(os.getpid()) + +z = PyZeppelinContext() +z._setup_matplotlib() + +output = Logger() +sys.stdout = output +#sys.stderr = output + +while True : + req = intp.getStatements() + if req == None: + break + + try: + stmts = req.statements().split("\n") + final_code = [] + + # Get post-execute hooks + try: + global_hook = intp.getHook('post_exec_dev') + except: + global_hook = None + + try: + user_hook = z.getHook('post_exec') + except: + user_hook = None + + nhooks = 0 + for hook in (global_hook, user_hook): + if hook: + nhooks += 1 + + for s in stmts: + if s == None: + continue + + # skip comment + s_stripped = s.strip() + if len(s_stripped) == 0 or s_stripped.startswith("#"): + continue + + final_code.append(s) + + if final_code: + # use exec mode to compile the statements except the last statement, + # so that the last statement's evaluation will be printed to stdout + code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1) + + to_run_hooks = [] + if (nhooks > 0): + to_run_hooks = code.body[-nhooks:] + + to_run_exec, to_run_single = (code.body[:-(nhooks + 1)], + [code.body[-(nhooks + 1)]]) + + try: + for node in to_run_exec: + mod = ast.Module([node]) + code = compile(mod, '<stdin>', 'exec') + exec(code) + + for node in to_run_single: + mod = ast.Interactive([node]) + code = compile(mod, '<stdin>', 'single') + exec(code) + + for node in to_run_hooks: + mod = ast.Module([node]) + code = compile(mod, '<stdin>', 'exec') + exec(code) + except: + raise Exception(traceback.format_exc()) + + intp.setStatementsFinished("", False) + except Py4JJavaError: + excInnerError = traceback.format_exc() # format_tb() does not return the inner exception + innerErrorStart = excInnerError.find("Py4JJavaError:") + if innerErrorStart > -1: + excInnerError = excInnerError[innerErrorStart:] + intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True) + except Py4JNetworkError: + # lost connection from gateway server. exit + sys.exit(1) + except: + intp.setStatementsFinished(traceback.format_exc(), True) + + output.reset() http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java index c6d2a84..28d47e0 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java @@ -1,21 +1,23 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.python; + import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.user.AuthenticationInfo; @@ -132,4 +134,6 @@ public class PythonCondaInterpreterTest { null, new InterpreterOutput(null)); } + + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java index b4d3be2..566b5e0 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java @@ -21,8 +21,11 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; +import java.net.Inet4Address; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.HashMap; import java.util.Properties; @@ -46,8 +49,12 @@ public class PythonDockerInterpreterTest { group.put("note", Arrays.asList(python, docker)); python.setInterpreterGroup(group); docker.setInterpreterGroup(group); + doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString()); doReturn(python).when(docker).getPythonInterpreter(); + doReturn("/scriptpath/zeppelin_python.py").when(python).getScriptPath(); + + docker.open(); } @Test @@ -57,7 +64,7 @@ public class PythonDockerInterpreterTest { verify(python, times(1)).open(); verify(python, times(1)).close(); verify(docker, times(1)).pull(any(InterpreterOutput.class), anyString()); - verify(python).setPythonCommand("docker run -i --rm env python -iu"); + verify(python).setPythonCommand(Mockito.matches("docker run -i --rm -v.*")); } @Test http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java index 75604dc..8b48b24 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java @@ -1,25 +1,22 @@ - /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.python; -import java.util.*; - import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; @@ -29,34 +26,27 @@ import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; -/** - * In order for this test to work, test env must have installed: - * <ol> - * - <li>Python</li> - * - <li>Matplotlib</li> - * <ol> - * - * Your PYTHONPATH should also contain the directory of the Matplotlib - * backend files. Usually these can be found in $ZEPPELIN_HOME/interpreter/lib/python. - * - * To run manually on such environment, use: - * <code> - * mvn -Dpython.test.exclude='' test -pl python -am - * </code> - */ -public class PythonInterpreterMatplotlibTest { +import static org.junit.Assert.*; +public class PythonInterpreterMatplotlibTest implements InterpreterOutputListener { private InterpreterGroup intpGroup; private PythonInterpreter python; private InterpreterContext context; + InterpreterOutput out; @Before public void setUp() throws Exception { @@ -68,16 +58,27 @@ public class PythonInterpreterMatplotlibTest { python = new PythonInterpreter(p); python.setInterpreterGroup(intpGroup); - python.open(); List<Interpreter> interpreters = new LinkedList<>(); interpreters.add(python); intpGroup.put("note", interpreters); - context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(), - new HashMap<String, Object>(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), null, - new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null)); + out = new InterpreterOutput(this); + + context = new InterpreterContext("note", "id", null, "title", "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList<InterpreterContextRunner>(), + out); + python.open(); + } + + @After + public void afterTest() throws IOException { + python.close(); } @Test @@ -85,14 +86,14 @@ public class PythonInterpreterMatplotlibTest { // matplotlib InterpreterResult ret = python.interpret("import matplotlib", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - + // inline backend ret = python.interpret("import backend_zinline", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); } @Test - public void showPlot() { + public void showPlot() throws IOException { // Simple plot test InterpreterResult ret; ret = python.interpret("import matplotlib.pyplot as plt", context); @@ -100,15 +101,16 @@ public class PythonInterpreterMatplotlibTest { ret = python.interpret("plt.plot([1, 2, 3])", context); ret = python.interpret("plt.show()", context); - assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType()); - assertTrue(ret.message().get(0).getData().contains("data:image/png;base64")); - assertTrue(ret.message().get(0).getData().contains("<div>")); + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Type.TEXT, out.getOutputAt(0).getType()); + assertEquals(new String(out.getOutputAt(1).toByteArray()), InterpreterResult.Type.HTML, out.getOutputAt(1).getType()); + assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("data:image/png;base64")); + assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("<div>")); } @Test // Test for when configuration is set to auto-close figures after show(). - public void testClose() { + public void testClose() throws IOException { InterpreterResult ret; InterpreterResult ret1; InterpreterResult ret2; @@ -116,25 +118,33 @@ public class PythonInterpreterMatplotlibTest { ret = python.interpret("z.configure_mpl(interactive=False)", context); ret = python.interpret("plt.plot([1, 2, 3])", context); ret1 = python.interpret("plt.show()", context); - + // Second call to show() should print nothing, and Type should be TEXT. // This is because when close=True, there should be no living instances // of FigureManager, causing show() to return before setting the output // type to HTML. ret = python.interpret("plt.show()", context); + + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); assertEquals(0, ret.message().size()); - + // Now test that new plot is drawn. It should be identical to the // previous one. ret = python.interpret("plt.plot([1, 2, 3])", context); + String msg1 = new String(out.getOutputAt(0).toByteArray()); + InterpreterResult.Type type1 = out.getOutputAt(0).getType(); + ret2 = python.interpret("plt.show()", context); - assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType()); - assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData()); + String msg2 = new String(out.getOutputAt(0).toByteArray()); + InterpreterResult.Type type2 = out.getOutputAt(0).getType(); + + assertEquals(msg1, msg2); + assertEquals(type1, type2); } - + @Test // Test for when configuration is set to not auto-close figures after show(). - public void testNoClose() { + public void testNoClose() throws IOException { InterpreterResult ret; InterpreterResult ret1; InterpreterResult ret2; @@ -142,19 +152,39 @@ public class PythonInterpreterMatplotlibTest { ret = python.interpret("z.configure_mpl(interactive=False, close=False)", context); ret = python.interpret("plt.plot([1, 2, 3])", context); ret1 = python.interpret("plt.show()", context); - + // Second call to show() should print nothing, and Type should be HTML. // This is because when close=False, there should be living instances // of FigureManager, causing show() to set the output // type to HTML even though the figure is inactive. ret = python.interpret("plt.show()", context); - assertEquals("", ret.message().get(0).getData()); - + String msg1 = new String(out.getOutputAt(0).toByteArray()); + assertNotSame("", msg1); + // Now test that plot can be reshown if it is updated. It should be // different from the previous one because it will plot the same line // again but in a different color. ret = python.interpret("plt.plot([1, 2, 3])", context); + msg1 = new String(out.getOutputAt(1).toByteArray()); ret2 = python.interpret("plt.show()", context); - assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData()); + String msg2 = new String(out.getOutputAt(1).toByteArray()); + + assertNotSame(msg1, msg2); + } + + + @Override + public void onUpdateAll(InterpreterOutput out) { + + } + + @Override + public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { + + } + + @Override + public void onUpdate(int index, InterpreterResultMessageOutput out) { + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cd5f86/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java index 86fb22b..f200a0a 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java @@ -21,13 +21,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -35,7 +38,10 @@ import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,13 +59,14 @@ import org.junit.Test; * mvn -Dpython.test.exclude='' test -pl python -am * </code> */ -public class PythonInterpreterPandasSqlTest { +public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener { private InterpreterGroup intpGroup; private PythonInterpreterPandasSql sql; private PythonInterpreter python; private InterpreterContext context; + InterpreterOutput out; @Before public void setUp() throws Exception { @@ -78,14 +85,27 @@ public class PythonInterpreterPandasSqlTest { intpGroup.put("note", Arrays.asList(python, sql)); - context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(), - new HashMap<String, Object>(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), null, - new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null)); + out = new InterpreterOutput(this); + + context = new InterpreterContext("note", "id", null, "title", "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList<InterpreterContextRunner>(), + out); + + // to make sure python is running. + InterpreterResult ret = python.interpret("\n", context); + assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - //important to be last step sql.open(); - //it depends on python interpreter presence in the same group + } + + @After + public void afterTest() throws IOException { + sql.close(); } @Test @@ -97,23 +117,15 @@ public class PythonInterpreterPandasSqlTest { @Test public void errorMessageIfDependenciesNotInstalled() { InterpreterResult ret; - // given - ret = python.interpret( - "pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')", - context); - assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - - // when ret = sql.interpret("SELECT * from something", context); - // then assertNotNull(ret); - assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code()); - assertTrue(ret.message().get(0).getData().contains("dependency is not installed")); + assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.ERROR, ret.code()); + assertTrue(ret.message().get(0).getData().contains("no such table: something")); } @Test - public void sqlOverTestDataPrintsTable() { + public void sqlOverTestDataPrintsTable() throws IOException { InterpreterResult ret; // given //String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34"; @@ -121,36 +133,34 @@ public class PythonInterpreterPandasSqlTest { ret = python.interpret("import numpy as np", context); // DataFrame df2 \w test data ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+ - "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context); + "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); //when ret = sql.interpret("select name, age from df2 where age < 40", context); //then - assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType()); - //assertEquals(expectedTable, ret.message()); //somehow it's same but not equal - assertTrue(ret.message().get(0).getData().indexOf("moon\t33") > 0); - assertTrue(ret.message().get(0).getData().indexOf("park\t34") > 0); + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType()); + assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("moon\t33") > 0); + assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("park\t34") > 0); assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code()); } @Test - public void badSqlSyntaxFails() { + public void badSqlSyntaxFails() throws IOException { //when InterpreterResult ret = sql.interpret("select wrong syntax", context); //then assertNotNull("Interpreter returned 'null'", ret); - //System.out.println("\nInterpreter response: \n" + ret.message()); assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code()); - assertTrue(ret.message().get(0).getData().length() > 0); + assertTrue(out.toInterpreterResultMessage().size() == 0); } @Test - public void showDataFrame() { + public void showDataFrame() throws IOException { InterpreterResult ret; ret = python.interpret("import pandas as pd", context); ret = python.interpret("import numpy as np", context); @@ -165,11 +175,25 @@ public class PythonInterpreterPandasSqlTest { ret = python.interpret("z.show(df1, show_index=True)", context); // then - assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType()); - assertTrue(ret.message().get(0).getData().indexOf("index_name") == 0); - assertTrue(ret.message().get(0).getData().indexOf("13") > 0); - assertTrue(ret.message().get(0).getData().indexOf("nan") > 0); - assertTrue(ret.message().get(0).getData().indexOf("6.7") > 0); + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType()); + assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("index_name")); + assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("nan")); + assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("6.7")); + } + + @Override + public void onUpdateAll(InterpreterOutput out) { + + } + + @Override + public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { + + } + + @Override + public void onUpdate(int index, InterpreterResultMessageOutput out) { + } -} +} \ No newline at end of file
