Repository: zeppelin Updated Branches: refs/heads/master fd27014b0 -> b9d6056b7
ZEPPELIN-3364. Followup of ZEPPELIN-3362, improve ZeppelinContext & add more test ### What is this PR for? This PR address the remaining issue of ZEPPELIN-3362, and also add more test for ZeppelinContext. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3364 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2894 from zjffdu/ZEPPELIN-3364 and squashes the following commits: 891f1e1 [Jeff Zhang] ZEPPELIN-3364. Followup of ZEPPELIN-3362, improve ZeppelinContext & add more test Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b9d6056b Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b9d6056b Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b9d6056b Branch: refs/heads/master Commit: b9d6056b7daa58d3021b70da2cfd8ad21c0ffb6e Parents: fd27014 Author: Jeff Zhang <zjf...@apache.org> Authored: Sun Mar 25 22:13:44 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Fri Mar 30 12:05:47 2018 +0800 ---------------------------------------------------------------------- .../main/resources/python/zeppelin_context.py | 26 ++++----- .../zeppelin/python/IPythonInterpreterTest.java | 24 ++++++++- .../zeppelin/spark/IPySparkInterpreterTest.java | 2 +- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 57 +++++++++++++++++++- 4 files changed, 92 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b9d6056b/python/src/main/resources/python/zeppelin_context.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/zeppelin_context.py b/python/src/main/resources/python/zeppelin_context.py index d97a789..d29a16f 100644 --- a/python/src/main/resources/python/zeppelin_context.py +++ b/python/src/main/resources/python/zeppelin_context.py @@ -17,6 +17,7 @@ import os, sys import warnings +import base64 from io import BytesIO @@ -34,7 +35,7 @@ class PyZeppelinContext(object): self.gateway = gateway self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption self.javaList = gateway.jvm.java.util.ArrayList - self.max_result = 1000 + self.max_result = z.getMaxResult() self._displayhook = lambda *args: None self._setup_matplotlib() @@ -129,13 +130,13 @@ class PyZeppelinContext(object): # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` # and so a dependency on pandas self.show_dataframe(p, **kwargs) - elif hasattr(p, '__call__'): - p() #error reporting - + else: + print(str(p)) + def show_dataframe(self, df, show_index=False, **kwargs): """Pretty prints DF using Table Display System """ - limit = len(df) > self.max_result + exceed_limit = len(df) > self.max_result header_buf = StringIO("") if show_index: idx_name = str(df.index.name) if df.index.name is not None else "" @@ -147,7 +148,7 @@ class PyZeppelinContext(object): header_buf.write("\n") body_buf = StringIO("") - rows = df.head(self.max_result).values if limit else df.values + rows = df.head(self.max_result).values if exceed_limit else df.values index = df.index.values for idx, row in zip(index, rows): if show_index: @@ -158,13 +159,12 @@ class PyZeppelinContext(object): body_buf.write("\t") body_buf.write(str(cell)) body_buf.write("\n") - body_buf.seek(0); header_buf.seek(0) - #TODO(bzz): fix it, so it shows red notice, as in Spark - print("%table " + header_buf.read() + body_buf.read()) # + - # ("\n<font color=red>Results are limited by {}.</font>" \ - # .format(self.max_result) if limit else "") - #) + body_buf.seek(0) + header_buf.seek(0) + print("%table " + header_buf.read() + body_buf.read()) body_buf.close(); header_buf.close() + if exceed_limit: + print("%html <font color=red>Results are limited by {}.</font>".format(self.max_result)) def show_matplotlib(self, p, fmt="png", width="auto", height="auto", **kwargs): @@ -176,7 +176,7 @@ class PyZeppelinContext(object): img_str = b"data:image/png;base64," img_str += base64.b64encode(img.getvalue().strip()) img_tag = "<img src={img} style='width={width};height:{height}'>" - # Decoding is necessary for Python 3 compability + # Decoding is necessary for Python 3 compatibility img_str = img_str.decode("ascii") img_str = img_tag.format(img=img_str, width=width, height=height) elif fmt == "svg": http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b9d6056b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index 75f1c06..f016f09 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -71,7 +71,9 @@ public class IPythonInterpreterTest { @Test public void testIPython() throws IOException, InterruptedException, InterpreterException { - startInterpreter(new Properties()); + Properties properties = new Properties(); + properties.setProperty("zeppelin.python.maxResult", "3"); + startInterpreter(properties); testInterpreter(interpreter); } @@ -454,9 +456,29 @@ public class IPythonInterpreterTest { result = interpreter.interpret("import pandas as pd\ndf = pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})\nz.show(df)", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertEquals(1, interpreterResultMessages.size()); assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType()); assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData()); + context = getInterpreterContext(); + result = interpreter.interpret("import pandas as pd\ndf = pd.DataFrame({'id':[1,2,3,4], 'name':['a','b','c', 'd']})\nz.show(df)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertEquals(2, interpreterResultMessages.size()); + assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType()); + assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType()); + assertEquals("<font color=red>Results are limited by 3.</font>\n", interpreterResultMessages.get(1).getData()); + + // z.show(matplotlib) + context = getInterpreterContext(); + result = interpreter.interpret("import matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)\nz.show(plt)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertEquals(2, interpreterResultMessages.size()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType()); + assertEquals(InterpreterResult.Type.IMG, interpreterResultMessages.get(1).getType()); + // clear output context = getInterpreterContext(); result = interpreter.interpret("import time\nprint(\"Hello\")\ntime.sleep(0.5)\nz.getInterpreterContext().out().clear()\nprint(\"world\")\n", context); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b9d6056b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index d66f89f..8d08117 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -64,7 +64,7 @@ public class IPySparkInterpreterTest { p.setProperty("spark.submit.deployMode", "client"); p.setProperty("spark.app.name", "Zeppelin Test"); p.setProperty("zeppelin.spark.useHiveContext", "true"); - p.setProperty("zeppelin.spark.maxResult", "1000"); + p.setProperty("zeppelin.spark.maxResult", "3"); p.setProperty("zeppelin.spark.importImplicit", "true"); p.setProperty("zeppelin.pyspark.python", "python"); p.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b9d6056b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index a440419..12d6f14 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -18,7 +18,9 @@ package org.apache.zeppelin.rest; import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterNotFoundException; import org.apache.zeppelin.interpreter.InterpreterProperty; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; @@ -367,15 +369,22 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { Paragraph p3 = note.addNewParagraph(anonymous); p3.setText("%spark.pyspark print(z.get(\"var_1\"))"); + // resources across interpreter processes (via DistributedResourcePool) + Paragraph p4 = note.addNewParagraph(anonymous); + p4.setText("%python print(z.get('var_1'))"); + note.run(p1.getId(), true); note.run(p2.getId(), true); note.run(p3.getId(), true); + note.run(p4.getId(), true); assertEquals(Status.FINISHED, p1.getStatus()); assertEquals(Status.FINISHED, p2.getStatus()); assertEquals("hello world\n", p2.getResult().message().get(0).getData()); assertEquals(Status.FINISHED, p3.getStatus()); assertEquals("hello world\n", p3.getResult().message().get(0).getData()); + assertEquals(Status.FINISHED, p4.getStatus()); + assertEquals("hello world\n", p4.getResult().message().get(0).getData()); } @Test @@ -502,8 +511,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { "[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" + "print(items[0])"; p.setText(code); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); Iterator<String> formIter = p.settings.getForms().keySet().iterator(); @@ -520,6 +528,51 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { } @Test + public void testAngularObjects() throws IOException, InterpreterNotFoundException { + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p1 = note.addNewParagraph(anonymous); + + // add local angular object + p1.setText("%spark z.angularBind(\"name\", \"world\")"); + note.run(p1.getId(), true); + assertEquals(Status.FINISHED, p1.getStatus()); + List<AngularObject> angularObjects = + p1.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null); + assertEquals(1, angularObjects.size()); + assertEquals("name", angularObjects.get(0).getName()); + assertEquals("world", angularObjects.get(0).get()); + + // remove local angular object + Paragraph p2 = note.addNewParagraph(anonymous); + p2.setText("%spark z.angularUnbind(\"name\")"); + note.run(p2.getId(), true); + assertEquals(Status.FINISHED, p2.getStatus()); + angularObjects = + p1.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null); + assertEquals(0, angularObjects.size()); + + // add global angular object + Paragraph p3 = note.addNewParagraph(anonymous); + p3.setText("%spark z.angularBindGlobal(\"name2\", \"world2\")"); + note.run(p3.getId(), true); + assertEquals(Status.FINISHED, p3.getStatus()); + List<AngularObject> globalAngularObjects = + p3.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(null, null); + assertEquals(1, globalAngularObjects.size()); + assertEquals("name2", globalAngularObjects.get(0).getName()); + assertEquals("world2", globalAngularObjects.get(0).get()); + + // remove global angular object + Paragraph p4 = note.addNewParagraph(anonymous); + p4.setText("%spark z.angularUnbindGlobal(\"name2\")"); + note.run(p4.getId(), true); + assertEquals(Status.FINISHED, p4.getStatus()); + globalAngularObjects = + p4.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null); + assertEquals(0, globalAngularObjects.size()); + } + + @Test public void testConfInterpreter() throws IOException { ZeppelinServer.notebook.getInterpreterSettingManager().close(); Note note = ZeppelinServer.notebook.createNote(anonymous);