[ZEPPELIN-554] Streaming interpreter output to front-end ### What is this PR for? Output from interpreter is displayed after completion of paragraph execution. It'll be useful if output can be streamed to front-end during execution.
Previous work #593 injects InterpreterOutput stream object to Interpreter. This PR is based on #593 and stream the data from InterpreterOutput to front-end. This implementation only streams output is %text. Other output type (%html, %angular, %table) is not streamed to the front end. While this PR keeps backward compatibility, Interpreter who want to use this feature will need to modify code to write output into `InterpreterOutput` instead of return with `InterpreterResult`. This PR includes modification of SparkInterpreter to use InterpreterOutput. ### What type of PR is it? Feature ### Todos ### Is there a relevant Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-554 ### How should this be tested? Run such code using Spark interpreter ``` (1 to 10).foreach{ i=> Thread.sleep(1000) println("Hello " + i) } ``` ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Lee moon soo <[email protected]> Closes #611 from Leemoonsoo/output_stream_frontend and squashes the following commits: 53e2bb4 [Lee moon soo] Not persist on every append dedae0d [Lee moon soo] Remove debug lines 8251fb4 [Lee moon soo] Fix syntax and style 9c9c8fd [Lee moon soo] update test 18215a3 [Lee moon soo] fix style f7e6a4d [Lee moon soo] Fix syntax error d29cfbf [Lee moon soo] workaround jshint 07b3e1a [Lee moon soo] Handle clear output correctly bc6262e [Lee moon soo] Make PysparkInterpreter stream output 6d9cc51 [Lee moon soo] Pass InterpreterOutput to SparkILoop b68180e [Lee moon soo] Add InterpreterOutput on spark interpreter unitest 626ad48 [Lee moon soo] Add license header 846015b [Lee moon soo] Update scalding 37d6920 [Lee moon soo] Handle display system directive correctly e278e84 [Lee moon soo] Clear output correctly 479b836 [Lee moon soo] Add test c01df62 [Lee moon soo] Connect Spark interpreter Console.out to outputstream 8a1223f [Lee moon soo] Handle update output correctly e7a9b37 [Lee moon soo] Delayed persist 2060c1e [Lee moon soo] Clear before render text 786c978 [Lee moon soo] update paragraph object after witing to outputstream 258ff38 [Lee moon soo] Barely working 6f607f7 [Lee moon soo] Add newline listener 89d9798 [Lee moon soo] Render text output line by line a42e4ff [Lee moon soo] Update test fb5e7b5 [Lee moon soo] Update test 0f60b54 [Lee moon soo] Update test a07d7db [Lee moon soo] Implement InterpreterResult.toString 1f419b6 [Lee moon soo] Add InterpreterOutput c91f498 [Lee moon soo] prepend interpreteroutputstream to interpreter result Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/5ec59a81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/5ec59a81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/5ec59a81 Branch: refs/heads/master Commit: 5ec59a81b2fda2fb65d4075e0672930b769f41d2 Parents: dbdaf84 Author: Lee moon soo <[email protected]> Authored: Sat Jan 16 11:04:09 2016 -0800 Committer: Lee moon soo <[email protected]> Committed: Wed Jan 20 15:39:21 2016 -0800 ---------------------------------------------------------------------- .../zeppelin/flink/FlinkInterpreterTest.java | 2 +- .../zeppelin/hive/HiveInterpreterTest.java | 12 +- .../zeppelin/ignite/IgniteInterpreterTest.java | 2 +- .../ignite/IgniteSqlInterpreterTest.java | 2 +- .../scalding/ScaldingInterpreterTest.java | 2 +- .../zeppelin/spark/PySparkInterpreter.java | 36 ++- .../apache/zeppelin/spark/SparkInterpreter.java | 32 +-- .../zeppelin/spark/SparkOutputStream.java | 75 ++++++ .../apache/zeppelin/spark/ZeppelinContext.java | 18 +- .../main/resources/python/zeppelin_pyspark.py | 7 +- .../zeppelin/spark/DepInterpreterTest.java | 2 +- .../zeppelin/spark/SparkInterpreterTest.java | 23 +- .../zeppelin/spark/SparkSqlInterpreterTest.java | 17 +- .../interpreter/InterpreterContext.java | 5 +- .../zeppelin/interpreter/InterpreterOutput.java | 249 +++++++++++++++++++ .../InterpreterOutputChangeListener.java | 27 ++ .../InterpreterOutputChangeWatcher.java | 140 +++++++++++ .../interpreter/InterpreterOutputListener.java | 34 +++ .../zeppelin/interpreter/InterpreterResult.java | 4 + .../interpreter/remote/RemoteInterpreter.java | 27 +- .../remote/RemoteInterpreterEventPoller.java | 26 +- .../remote/RemoteInterpreterProcess.java | 9 +- .../RemoteInterpreterProcessListener.java | 25 ++ .../remote/RemoteInterpreterServer.java | 68 ++++- .../thrift/RemoteInterpreterContext.java | 2 +- .../thrift/RemoteInterpreterEvent.java | 2 +- .../thrift/RemoteInterpreterEventType.java | 8 +- .../thrift/RemoteInterpreterResult.java | 2 +- .../thrift/RemoteInterpreterService.java | 2 +- .../main/thrift/RemoteInterpreterService.thrift | 4 +- .../interpreter/InterpreterContextTest.java | 2 +- .../InterpreterOutputChangeWatcherTest.java | 109 ++++++++ .../interpreter/InterpreterOutputTest.java | 127 ++++++++++ .../interpreter/InterpreterResultTest.java | 5 + .../remote/RemoteAngularObjectTest.java | 15 +- .../RemoteInterpreterOutputTestStream.java | 146 +++++++++++ .../remote/RemoteInterpreterProcessTest.java | 2 +- .../remote/RemoteInterpreterTest.java | 174 +++++-------- .../mock/MockInterpreterOutputStream.java | 97 ++++++++ .../zeppelin/scheduler/RemoteSchedulerTest.java | 32 +-- .../apache/zeppelin/server/ZeppelinServer.java | 3 +- .../org/apache/zeppelin/socket/Message.java | 2 + .../apache/zeppelin/socket/NotebookServer.java | 82 +++++- .../notebook/paragraph/paragraph-results.html | 12 +- .../notebook/paragraph/paragraph.controller.js | 65 ++++- .../websocketEvents/websocketEvents.factory.js | 4 + .../interpreter/InterpreterFactory.java | 13 +- .../zeppelin/notebook/JobListenerFactory.java | 4 +- .../java/org/apache/zeppelin/notebook/Note.java | 77 ++++-- .../org/apache/zeppelin/notebook/Paragraph.java | 55 +++- .../zeppelin/notebook/ParagraphJobListener.java | 29 +++ .../interpreter/InterpreterFactoryTest.java | 6 +- .../notebook/NoteInterpreterLoaderTest.java | 2 +- .../apache/zeppelin/notebook/NotebookTest.java | 18 +- .../notebook/repo/NotebookRepoSyncTest.java | 20 +- .../notebook/repo/VFSNotebookRepoTest.java | 9 +- 56 files changed, 1672 insertions(+), 302 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 3168f04..9a61be6 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -40,7 +40,7 @@ public class FlinkInterpreterTest { Properties p = new Properties(); flink = new FlinkInterpreter(p); flink.open(); - context = new InterpreterContext(null, null, null, null, null, null, null, null); + context = new InterpreterContext(null, null, null, null, null, null, null, null, null); } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java ---------------------------------------------------------------------- diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java index c22080d..c86fcf3 100644 --- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java +++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java @@ -79,9 +79,9 @@ public class HiveInterpreterTest { HiveInterpreter t = new HiveInterpreter(properties); t.open(); - assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME")); + assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME")); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message()); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message()); } @Test @@ -101,7 +101,7 @@ public class HiveInterpreterTest { t.open(); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", - t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message()); + t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message()); } @Test @@ -117,13 +117,13 @@ public class HiveInterpreterTest { t.open(); InterpreterResult interpreterResult = - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message()); t.getConnection("default").close(); interpreterResult = - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message()); } @@ -139,7 +139,7 @@ public class HiveInterpreterTest { HiveInterpreter t = new HiveInterpreter(properties); t.open(); - InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null); + InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null); //simple select test InterpreterResult result = t.interpret("select * from test_table", interpreterContext); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java index f46b049..cf98083 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java @@ -40,7 +40,7 @@ public class IgniteInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null); private IgniteInterpreter intp; private Ignite ignite; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java index fb93ad5..a6dcc66 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java @@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null); private Ignite ignite; private IgniteSqlInterpreter intp; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java ---------------------------------------------------------------------- diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java index 7a753fa..606d4d9 100644 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -65,7 +65,7 @@ public class ScaldingInterpreterTest { context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry( intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>()); + new LinkedList<InterpreterContextRunner>(), null); } @After http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 8c4ba87..c5441ab 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -73,8 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private GatewayServer gatewayServer; private DefaultExecutor executor; private int port; - private ByteArrayOutputStream outputStream; - private ByteArrayOutputStream errStream; + private SparkOutputStream outputStream; private BufferedWriter ins; private PipedInputStream in; private ByteArrayOutputStream input; @@ -173,7 +172,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand cmd.addArgument(Integer.toString(port), false); cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false); executor = new DefaultExecutor(); - outputStream = new ByteArrayOutputStream(); + outputStream = new SparkOutputStream(); PipedOutputStream ps = new PipedOutputStream(); in = null; try { @@ -274,7 +273,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand statementError = error; statementFinishedNotifier.notify(); } - } boolean pythonScriptInitialized = false; @@ -287,6 +285,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } + public void appendOutput(String message) throws IOException { + outputStream.getInterpreterOutput().write(message); + } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { SparkInterpreter sparkInterpreter = getSparkInterpreter(); @@ -300,7 +302,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand + outputStream.toString()); } - outputStream.reset(); + outputStream.setInterpreterOutput(context.out); synchronized (pythonScriptInitializeNotifier) { long startTime = System.currentTimeMillis(); @@ -314,15 +316,24 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } + String errorMessage = ""; + try { + context.out.flush(); + errorMessage = new String(context.out.toByteArray()); + } catch (IOException e) { + throw new InterpreterException(e); + } + + if (pythonscriptRunning == false) { // python script failed to initialize and terminated return new InterpreterResult(Code.ERROR, "failed to start pyspark" - + outputStream.toString()); + + errorMessage); } if (pythonScriptInitialized == false) { // timeout. didn't get initialized message return new InterpreterResult(Code.ERROR, "pyspark is not responding " - + outputStream.toString()); + + errorMessage); } if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) { @@ -352,7 +363,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand if (statementError) { return new InterpreterResult(Code.ERROR, statementOutput); } else { - return new InterpreterResult(Code.SUCCESS, statementOutput); + + try { + context.out.flush(); + } catch (IOException e) { + throw new InterpreterException(e); + } + + return new InterpreterResult(Code.SUCCESS); } } @@ -389,8 +407,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return new LinkedList<String>(); } - outputStream.reset(); - pythonInterpretRequest = new PythonInterpretRequest(completionCommand, ""); statementOutput = null; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index d975791..7ee6d7c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -17,9 +17,7 @@ package org.apache.zeppelin.spark; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.PrintStream; import java.io.PrintWriter; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -41,7 +39,6 @@ import org.apache.spark.repl.SparkJLineCompletion; import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; -import org.apache.spark.scheduler.SparkListener; import org.apache.spark.sql.SQLContext; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.zeppelin.interpreter.Interpreter; @@ -115,7 +112,7 @@ public class SparkInterpreter extends Interpreter { private SparkILoop interpreter; private SparkIMain intp; private SparkContext sc; - private ByteArrayOutputStream out; + private SparkOutputStream out; private SQLContext sqlc; private SparkDependencyResolver dep; private SparkJLineCompletion completor; @@ -129,7 +126,7 @@ public class SparkInterpreter extends Interpreter { public SparkInterpreter(Properties property) { super(property); - out = new ByteArrayOutputStream(); + out = new SparkOutputStream(); } public SparkInterpreter(Properties property, SparkContext sc) { @@ -452,10 +449,9 @@ public class SparkInterpreter extends Interpreter { b.v_$eq(true); settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - PrintStream printStream = new PrintStream(out); - /* spark interpreter */ this.interpreter = new SparkILoop(null, new PrintWriter(out)); + interpreter.settings_$eq(settings); interpreter.createInterpreter(); @@ -481,7 +477,7 @@ public class SparkInterpreter extends Interpreter { dep = getDependencyResolver(); - z = new ZeppelinContext(sc, sqlc, null, dep, printStream, + z = new ZeppelinContext(sc, sqlc, null, dep, Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); @@ -489,7 +485,6 @@ public class SparkInterpreter extends Interpreter { binder.put("sc", sc); binder.put("sqlc", sqlc); binder.put("z", z); - binder.put("out", printStream); intp.interpret("@transient val z = " + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]"); @@ -675,13 +670,13 @@ public class SparkInterpreter extends Interpreter { synchronized (this) { z.setGui(context.getGui()); sc.setJobGroup(getJobGroup(context), "Zeppelin", false); - InterpreterResult r = interpretInput(lines); + InterpreterResult r = interpretInput(lines, context); sc.clearJobGroup(); return r; } } - public InterpreterResult interpretInput(String[] lines) { + public InterpreterResult interpretInput(String[] lines, InterpreterContext context) { SparkEnv.set(env); // add print("") to make sure not finishing with comment @@ -692,8 +687,9 @@ public class SparkInterpreter extends Interpreter { } linesToRun[lines.length] = "print(\"\")"; - Console.setOut((java.io.PrintStream) binder.get("out")); - out.reset(); + Console.setOut(context.out); + out.setInterpreterOutput(context.out); + context.out.clear(); Code r = null; String incomplete = ""; @@ -713,6 +709,7 @@ public class SparkInterpreter extends Interpreter { res = intp.interpret(incomplete + s); } catch (Exception e) { sc.clearJobGroup(); + out.setInterpreterOutput(null); logger.info("Interpreter exception", e); return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); } @@ -721,7 +718,8 @@ public class SparkInterpreter extends Interpreter { if (r == Code.ERROR) { sc.clearJobGroup(); - return new InterpreterResult(r, out.toString()); + out.setInterpreterOutput(null); + return new InterpreterResult(r, ""); } else if (r == Code.INCOMPLETE) { incomplete += s + "\n"; } else { @@ -730,9 +728,13 @@ public class SparkInterpreter extends Interpreter { } if (r == Code.INCOMPLETE) { + sc.clearJobGroup(); + out.setInterpreterOutput(null); return new InterpreterResult(r, "Incomplete expression"); } else { - return new InterpreterResult(r, out.toString()); + sc.clearJobGroup(); + out.setInterpreterOutput(null); + return new InterpreterResult(Code.SUCCESS); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java new file mode 100644 index 0000000..98a4090 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.spark; + +import org.apache.zeppelin.interpreter.InterpreterOutput; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * InterpreterOutput can be attached / detached. + */ +public class SparkOutputStream extends OutputStream { + InterpreterOutput interpreterOutput; + + public SparkOutputStream() { + } + + public InterpreterOutput getInterpreterOutput() { + return interpreterOutput; + } + + public void setInterpreterOutput(InterpreterOutput interpreterOutput) { + this.interpreterOutput = interpreterOutput; + } + + @Override + public void write(int b) throws IOException { + if (interpreterOutput != null) { + interpreterOutput.write(b); + } + } + + @Override + public void write(byte [] b) throws IOException { + if (interpreterOutput != null) { + interpreterOutput.write(b); + } + } + + @Override + public void write(byte [] b, int offset, int len) throws IOException { + if (interpreterOutput != null) { + interpreterOutput.write(b, offset, len); + } + } + + @Override + public void close() throws IOException { + if (interpreterOutput != null) { + interpreterOutput.close(); + } + } + + @Override + public void flush() throws IOException { + if (interpreterOutput != null) { + interpreterOutput.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index a55ed73..6869161 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -21,6 +21,7 @@ import static scala.collection.JavaConversions.asJavaCollection; import static scala.collection.JavaConversions.asJavaIterable; import static scala.collection.JavaConversions.collectionAsScalaIterable; +import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -54,19 +55,17 @@ import scala.collection.Iterable; */ public class ZeppelinContext extends HashMap<String, Object> { private SparkDependencyResolver dep; - private PrintStream out; private InterpreterContext interpreterContext; private int maxResult; public ZeppelinContext(SparkContext sc, SQLContext sql, InterpreterContext interpreterContext, - SparkDependencyResolver dep, PrintStream printStream, + SparkDependencyResolver dep, int maxResult) { this.sc = sc; this.sqlContext = sql; this.interpreterContext = interpreterContext; this.dep = dep; - this.out = printStream; this.maxResult = maxResult; } @@ -273,10 +272,15 @@ public class ZeppelinContext extends HashMap<String, Object> { throw new InterpreterException("Can not road DataFrame/SchemaRDD class"); } - if (cls.isInstance(o)) { - out.print(showDF(sc, interpreterContext, o, maxResult)); - } else { - out.print(o.toString()); + + try { + if (cls.isInstance(o)) { + interpreterContext.out.write(showDF(sc, interpreterContext, o, maxResult)); + } else { + interpreterContext.out.write(o.toString()); + } + } catch (IOException e) { + throw new InterpreterException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 62f0a82..7da0f4e 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -36,10 +36,7 @@ class Logger(object): self.out = "" def write(self, message): - self.out = self.out + message - - def get(self): - return self.out + intp.appendOutput(message) def reset(self): self.out = "" @@ -224,7 +221,7 @@ while True : sc.setJobGroup(jobGroup, "Zeppelin") eval(compiledCode) - intp.setStatementsFinished(output.get(), False) + intp.setStatementsFinished("", False) except Py4JJavaError: excInnerError = traceback.format_exc() # format_tb() does not return the inner exception innerErrorStart = excInnerError.find("Py4JJavaError:") http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java index efa8fae..2b5613a 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java @@ -60,7 +60,7 @@ public class DepInterpreterTest { context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>()); + new LinkedList<InterpreterContextRunner>(), null); } @After http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index b629978..778966f 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -28,10 +28,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.junit.After; import org.junit.Before; @@ -79,9 +76,21 @@ public class SparkInterpreterTest { InterpreterGroup intpGroup = new InterpreterGroup(); context = new InterpreterContext("note", "id", "title", "text", - new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry( - intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>()); + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>(), + new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + + } + })); } @After http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index 4688cf8..731eab6 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -25,10 +25,7 @@ import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Type; import org.junit.After; import org.junit.Before; @@ -69,7 +66,17 @@ public class SparkSqlInterpreterTest { } context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>()); + new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + + } + })); } @After http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 0417f91..e3f6b59 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.display.GUI; public class InterpreterContext { private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<InterpreterContext>(); + public final InterpreterOutput out; public static InterpreterContext get() { return threadIC.get(); @@ -58,7 +59,8 @@ public class InterpreterContext { Map<String, Object> config, GUI gui, AngularObjectRegistry angularObjectRegistry, - List<InterpreterContextRunner> runners + List<InterpreterContextRunner> runners, + InterpreterOutput out ) { this.noteId = noteId; this.paragraphId = paragraphId; @@ -68,6 +70,7 @@ public class InterpreterContext { this.gui = gui; this.angularObjectRegistry = angularObjectRegistry; this.runners = runners; + this.out = out; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java new file mode 100644 index 0000000..42ebe48 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.URL; +import java.util.LinkedList; +import java.util.List; + +/** + * InterpreterOutput is OutputStream that supposed to print content on notebook + * in addition to InterpreterResult which used to return from Interpreter.interpret(). + */ +public class InterpreterOutput extends OutputStream { + Logger logger = LoggerFactory.getLogger(InterpreterOutput.class); + private final int NEW_LINE_CHAR = '\n'; + + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + private final List<Object> outList = new LinkedList<Object>(); + private InterpreterOutputChangeWatcher watcher; + private final InterpreterOutputListener flushListener; + private InterpreterResult.Type type = InterpreterResult.Type.TEXT; + private boolean firstWrite = true; + + public InterpreterOutput(InterpreterOutputListener flushListener) { + this.flushListener = flushListener; + clear(); + } + + public InterpreterOutput(InterpreterOutputListener flushListener, + InterpreterOutputChangeListener listener) throws IOException { + this.flushListener = flushListener; + clear(); + watcher = new InterpreterOutputChangeWatcher(listener); + watcher.start(); + } + + public InterpreterResult.Type getType() { + return type; + } + + public void setType(InterpreterResult.Type type) { + if (this.type != type) { + clear(); + flushListener.onUpdate(this, new byte[]{}); + this.type = type; + } + } + + public void clear() { + synchronized (outList) { + type = InterpreterResult.Type.TEXT; + buffer.reset(); + outList.clear(); + if (watcher != null) { + watcher.clear(); + } + } + } + + @Override + public void write(int b) throws IOException { + synchronized (outList) { + buffer.write(b); + if (b == NEW_LINE_CHAR) { + // first time use of this outputstream. + if (firstWrite) { + // clear the output on gui + flushListener.onUpdate(this, new byte[]{}); + firstWrite = false; + } + + flush(); + } + } + } + + private byte [] detectTypeFromLine(byte [] byteArray) { + // check output type directive + String line = new String(byteArray); + for (InterpreterResult.Type t : InterpreterResult.Type.values()) { + String typeString = '%' + t.name().toLowerCase(); + if ((typeString + "\n").equals(line)) { + setType(t); + byteArray = null; + break; + } else if (line.startsWith(typeString + " ")) { + setType(t); + byteArray = line.substring(typeString.length() + 1).getBytes(); + break; + } + } + + return byteArray; + } + + @Override + public void write(byte [] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte [] b, int off, int len) throws IOException { + synchronized (outList) { + for (int i = off; i < len; i++) { + write(b[i]); + } + } + } + + /** + * In dev mode, it monitors file and update ZeppelinServer + * @param file + * @throws IOException + */ + public void write(File file) throws IOException { + outList.add(file); + if (watcher != null) { + watcher.watch(file); + } + } + + public void write(String string) throws IOException { + write(string.getBytes()); + } + + /** + * write contents in the resource file in the classpath + * @param url + * @throws IOException + */ + public void write(URL url) throws IOException { + if ("file".equals(url.getProtocol())) { + write(new File(url.getPath())); + } else { + outList.add(url); + } + } + + public void writeResource(String resourceName) throws IOException { + // search file under resource dir first for dev mode + File mainResource = new File("./src/main/resources/" + resourceName); + File testResource = new File("./src/test/resources/" + resourceName); + if (mainResource.isFile()) { + write(mainResource); + } else if (testResource.isFile()) { + write(testResource); + } else { + // search from classpath + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (cl == null) { + cl = this.getClass().getClassLoader(); + } + if (cl == null) { + cl = ClassLoader.getSystemClassLoader(); + } + + write(cl.getResource(resourceName)); + } + } + + public byte[] toByteArray() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + List<Object> all = new LinkedList<Object>(); + + synchronized (outList) { + all.addAll(outList); + } + + for (Object o : all) { + if (o instanceof File) { + File f = (File) o; + FileInputStream fin = new FileInputStream(f); + copyStream(fin, out); + fin.close(); + } else if (o instanceof byte[]) { + out.write((byte[]) o); + } else if (o instanceof Integer) { + out.write((int) o); + } else if (o instanceof URL) { + InputStream fin = ((URL) o).openStream(); + copyStream(fin, out); + fin.close(); + } else { + // can not handle the object + } + } + out.close(); + return out.toByteArray(); + } + + public void flush() throws IOException { + synchronized (outList) { + buffer.flush(); + byte[] bytes = buffer.toByteArray(); + bytes = detectTypeFromLine(bytes); + if (bytes != null) { + outList.add(bytes); + if (type == InterpreterResult.Type.TEXT) { + flushListener.onAppend(this, bytes); + } + } + buffer.reset(); + } + } + + private void copyStream(InputStream in, OutputStream out) throws IOException { + int bufferSize = 8192; + byte[] buffer = new byte[bufferSize]; + + while (true) { + int bytesRead = in.read(buffer); + if (bytesRead == -1) { + break; + } else { + out.write(buffer, 0, bytesRead); + } + } + } + + @Override + public void close() throws IOException { + flush(); + + if (watcher != null) { + watcher.clear(); + watcher.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java new file mode 100644 index 0000000..a639e0c --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import java.io.File; + +/** + * InterpreterOutputChangeListener + */ +public interface InterpreterOutputChangeListener { + public void fileChanged(File file); + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java new file mode 100644 index 0000000..5fe8237 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.OVERFLOW; +import java.io.File; +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Watch the change for the development mode support + */ +public class InterpreterOutputChangeWatcher extends Thread { + Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class); + + private WatchService watcher; + private final List<File> watchFiles = new LinkedList<File>(); + private final Map<WatchKey, File> watchKeys = new HashMap<WatchKey, File>(); + private InterpreterOutputChangeListener listener; + private boolean stop; + + public InterpreterOutputChangeWatcher(InterpreterOutputChangeListener listener) + throws IOException { + watcher = FileSystems.getDefault().newWatchService(); + this.listener = listener; + } + + public void watch(File file) throws IOException { + String dirString; + if (file.isFile()) { + dirString = file.getParentFile().getAbsolutePath(); + } else { + throw new IOException(file.getName() + " is not a file"); + } + + if (dirString == null) { + dirString = "/"; + } + + Path dir = FileSystems.getDefault().getPath(dirString); + logger.info("watch " + dir); + WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + synchronized (watchKeys) { + watchKeys.put(key, new File(dirString)); + watchFiles.add(file); + } + } + + public void clear() { + synchronized (watchKeys) { + for (WatchKey key : watchKeys.keySet()) { + key.cancel(); + + } + watchKeys.clear(); + watchFiles.clear(); + } + } + + public void shutdown() throws IOException { + stop = true; + clear(); + watcher.close(); + } + + public void run() { + while (!stop) { + WatchKey key = null; + try { + key = watcher.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException | ClosedWatchServiceException e) { + break; + } + + if (key == null) { + continue; + } + for (WatchEvent<?> event : key.pollEvents()) { + WatchEvent.Kind<?> kind = event.kind(); + if (kind == OVERFLOW) { + continue; + } + WatchEvent<Path> ev = (WatchEvent<Path>) event; + Path filename = ev.context(); + // search for filename + synchronized (watchKeys) { + for (File f : watchFiles) { + if (f.getName().compareTo(filename.toString()) == 0) { + File changedFile; + if (filename.isAbsolute()) { + changedFile = new File(filename.toString()); + } else { + changedFile = new File(watchKeys.get(key), filename.toString()); + } + logger.info("File change detected " + changedFile.getAbsolutePath()); + if (listener != null) { + listener.fileChanged(changedFile); + } + } + } + } + } + + boolean valid = key.reset(); + if (!valid) { + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java new file mode 100644 index 0000000..bdb262a --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +/** + * Listen InterpreterOutput buffer flush + */ +public interface InterpreterOutputListener { + /** + * called when newline is detected + * @param line + */ + public void onAppend(InterpreterOutput out, byte[] line); + + /** + * when entire output is updated. eg) after detecting new display system + * @param output + */ + public void onUpdate(InterpreterOutput out, byte[] output); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java index 593cfc7..d213796 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java @@ -146,4 +146,8 @@ public class InterpreterResult implements Serializable { this.type = type; return this; } + + public String toString() { + return "%" + type.name().toLowerCase() + " " + msg; + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 455156c..d2a24e8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -48,6 +48,7 @@ import com.google.gson.reflect.TypeToken; * */ public class RemoteInterpreter extends Interpreter { + private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); Gson gson = new Gson(); private String interpreterRunner; @@ -60,32 +61,35 @@ public class RemoteInterpreter extends Interpreter { private int connectTimeout; public RemoteInterpreter(Properties property, - String className, - String interpreterRunner, - String interpreterPath, - int connectTimeout) { + String className, + String interpreterRunner, + String interpreterPath, + int connectTimeout, + RemoteInterpreterProcessListener remoteInterpreterProcessListener) { super(property); - this.className = className; initialized = false; this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; env = new HashMap<String, String>(); this.connectTimeout = connectTimeout; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; } public RemoteInterpreter(Properties property, - String className, - String interpreterRunner, - String interpreterPath, - Map<String, String> env, - int connectTimeout) { + String className, + String interpreterRunner, + String interpreterPath, + Map<String, String> env, + int connectTimeout, + RemoteInterpreterProcessListener remoteInterpreterProcessListener) { super(property); this.className = className; this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; this.env = env; this.connectTimeout = connectTimeout; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; } @Override @@ -103,7 +107,8 @@ public class RemoteInterpreter extends Interpreter { if (intpGroup.getRemoteInterpreterProcess() == null) { // create new remote process RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess( - interpreterRunner, interpreterPath, env, connectTimeout); + interpreterRunner, interpreterPath, env, connectTimeout, + remoteInterpreterProcessListener); intpGroup.setRemoteInterpreterProcess(remoteProcess); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index c39e0fe..6186205 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -18,29 +18,35 @@ package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + /** * */ public class RemoteInterpreterEventPoller extends Thread { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); + private final RemoteInterpreterProcessListener listener; private volatile boolean shutdown; private RemoteInterpreterProcess interpreterProcess; private InterpreterGroup interpreterGroup; - public RemoteInterpreterEventPoller() { + public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) { + this.listener = listener; shutdown = false; } @@ -110,6 +116,24 @@ public class RemoteInterpreterEventPoller extends Thread { interpreterProcess.getInterpreterContextRunnerPool().run( runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId()); + } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) { + // on output append + Map<String, String> outputAppend = gson.fromJson( + event.getData(), new TypeToken<Map<String, String>>() {}.getType()); + String noteId = outputAppend.get("noteId"); + String paragraphId = outputAppend.get("paragraphId"); + String outputToAppend = outputAppend.get("data"); + + listener.onOutputAppend(noteId, paragraphId, outputToAppend); + } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) { + // on output update + Map<String, String> outputAppend = gson.fromJson( + event.getData(), new TypeToken<Map<String, String>>() {}.getType()); + String noteId = outputAppend.get("noteId"); + String paragraphId = outputAppend.get("paragraphId"); + String outputToUpdate = outputAppend.get("data"); + + listener.onOutputUpdated(noteId, paragraphId, outputToUpdate); } logger.debug("Event from remoteproceess {}", event.getType()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 2c195dc..56b5485 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -53,10 +53,11 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { private int connectTimeout; public RemoteInterpreterProcess(String intpRunner, - String intpDir, - Map<String, String> env, - int connectTimeout) { - this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout); + String intpDir, + Map<String, String> env, + int connectTimeout, + RemoteInterpreterProcessListener listener) { + this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout); } RemoteInterpreterProcess(String intpRunner, http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java new file mode 100644 index 0000000..da6ac63 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote; + +/** + * Event from remoteInterpreterProcess + */ +public interface RemoteInterpreterProcessListener { + public void onOutputAppend(String noteId, String paragraphId, String output); + public void onOutputUpdated(String noteId, String paragraphId, String output); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index a8da8c0..728d210 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -35,15 +35,8 @@ import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.ClassloaderInterpreter; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -300,7 +293,26 @@ public class RemoteInterpreterServer try { InterpreterContext.set(context); InterpreterResult result = interpreter.interpret(script, context); - return result; + + // data from context.out is prepended to InterpreterResult if both defined + String message = ""; + + context.out.flush(); + InterpreterResult.Type outputType = context.out.getType(); + byte[] interpreterOutput = context.out.toByteArray(); + context.out.clear(); + + if (interpreterOutput != null && interpreterOutput.length > 0) { + message = new String(interpreterOutput); + } + + String interpreterResultMessage = result.message(); + if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) { + message += interpreterResultMessage; + return new InterpreterResult(result.code(), result.type(), message); + } else { + return new InterpreterResult(result.code(), outputType, message); + } } finally { InterpreterContext.remove(); } @@ -351,7 +363,8 @@ public class RemoteInterpreterServer private InterpreterContext convert(RemoteInterpreterContext ric) { List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>(); List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(), - new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType()); + new TypeToken<List<RemoteInterpreterContextRunner>>() { + }.getType()); for (InterpreterContextRunner r : runners) { contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId())); @@ -366,7 +379,40 @@ public class RemoteInterpreterServer new TypeToken<Map<String, Object>>() {}.getType()), gson.fromJson(ric.getGui(), GUI.class), interpreterGroup.getAngularObjectRegistry(), - contextRunners); + contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId())); + } + + + private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) { + return new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + Map<String, String> appendOutput = new HashMap<String, String>(); + appendOutput.put("noteId", noteId); + appendOutput.put("paragraphId", paragraphId); + appendOutput.put("data", new String(line)); + + Gson gson = new Gson(); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.OUTPUT_APPEND, + gson.toJson(appendOutput))); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + Map<String, String> appendOutput = new HashMap<String, String>(); + appendOutput.put("noteId", noteId); + appendOutput.put("paragraphId", paragraphId); + appendOutput.put("data", new String(output)); + + Gson gson = new Gson(); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.OUTPUT_UPDATE, + gson.toJson(appendOutput))); + } + }); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index a55d5de..175f482 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -51,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 96a49b5..79203fb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -51,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 9a7d142..d650318 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -33,7 +33,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { ANGULAR_OBJECT_ADD(2), ANGULAR_OBJECT_UPDATE(3), ANGULAR_OBJECT_REMOVE(4), - RUN_INTERPRETER_CONTEXT_RUNNER(5); + RUN_INTERPRETER_CONTEXT_RUNNER(5), + OUTPUT_APPEND(6), + OUTPUT_UPDATE(7); private final int value; @@ -64,6 +66,10 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { return ANGULAR_OBJECT_REMOVE; case 5: return RUN_INTERPRETER_CONTEXT_RUNNER; + case 6: + return OUTPUT_APPEND; + case 7: + return OUTPUT_UPDATE; default: return null; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 36c0f25..cc50f9c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -51,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index 6e6730e..738b453 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -51,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") public class RemoteInterpreterService { public interface Iface { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 144784c..65fd0a7 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -42,7 +42,9 @@ enum RemoteInterpreterEventType { ANGULAR_OBJECT_ADD = 2, ANGULAR_OBJECT_UPDATE = 3, ANGULAR_OBJECT_REMOVE = 4, - RUN_INTERPRETER_CONTEXT_RUNNER = 5 + RUN_INTERPRETER_CONTEXT_RUNNER = 5, + OUTPUT_APPEND = 6, + OUTPUT_UPDATE = 7 } struct RemoteInterpreterEvent { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java index 080bdaa..9c2732d 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java @@ -27,7 +27,7 @@ public class InterpreterContextTest { public void testThreadLocal() { assertNull(InterpreterContext.get()); - InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null)); + InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null)); assertNotNull(InterpreterContext.get()); InterpreterContext.remove(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java new file mode 100644 index 0000000..e376809 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener { + private File tmpDir; + private File fileChanged; + private int numChanged; + private InterpreterOutputChangeWatcher watcher; + + @Before + public void setUp() throws Exception { + watcher = new InterpreterOutputChangeWatcher(this); + watcher.start(); + + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); + tmpDir.mkdirs(); + fileChanged = null; + numChanged = 0; + } + + @After + public void tearDown() throws Exception { + watcher.shutdown(); + delete(tmpDir); + } + + private void delete(File file){ + if(file.isFile()) file.delete(); + else if(file.isDirectory()){ + File [] files = file.listFiles(); + if(files!=null && files.length>0){ + for(File f : files){ + delete(f); + } + } + file.delete(); + } + } + + + @Test + public void test() throws IOException, InterruptedException { + assertNull(fileChanged); + assertEquals(0, numChanged); + + Thread.sleep(1000); + // create new file + File file1 = new File(tmpDir, "test1"); + file1.createNewFile(); + + File file2 = new File(tmpDir, "test2"); + file2.createNewFile(); + + watcher.watch(file1); + Thread.sleep(1000); + + FileOutputStream out1 = new FileOutputStream(file1); + out1.write(1); + out1.close(); + + FileOutputStream out2 = new FileOutputStream(file2); + out2.write(1); + out2.close(); + + synchronized (this) { + wait(30*1000); + } + + assertNotNull(fileChanged); + assertEquals(1, numChanged); + } + + + @Override + public void fileChanged(File file) { + fileChanged = file; + numChanged++; + + synchronized(this) { + notify(); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java new file mode 100644 index 0000000..f8f4809 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class InterpreterOutputTest implements InterpreterOutputListener { + private InterpreterOutput out; + int numAppendEvent; + int numUpdateEvent; + + @Before + public void setUp() { + out = new InterpreterOutput(this); + numAppendEvent = 0; + numUpdateEvent = 0; + } + + @After + public void tearDown() throws IOException { + out.close(); + } + + @Test + public void testDetectNewline() throws IOException { + out.write("hello\nworld"); + assertEquals("hello\n", new String(out.toByteArray())); + assertEquals(1, numAppendEvent); + assertEquals(1, numUpdateEvent); + + out.write("\n"); + assertEquals("hello\nworld\n", new String(out.toByteArray())); + assertEquals(2, numAppendEvent); + assertEquals(1, numUpdateEvent); + } + + @Test + public void testFlush() throws IOException { + out.write("hello\nworld"); + assertEquals("hello\n", new String(out.toByteArray())); + assertEquals(1, numAppendEvent); + assertEquals(1, numUpdateEvent); + + out.flush(); + assertEquals("hello\nworld", new String(out.toByteArray())); + assertEquals(2, numAppendEvent); + assertEquals(1, numUpdateEvent); + + out.clear(); + out.write("%html div"); + assertEquals("", new String(out.toByteArray())); + assertEquals(InterpreterResult.Type.TEXT, out.getType()); + + out.flush(); + out.write("%html div"); + assertEquals("div", new String(out.toByteArray())); + assertEquals(InterpreterResult.Type.HTML, out.getType()); + } + + @Test + public void testType() throws IOException { + // default output stream type is TEXT + out.write("Text\n"); + assertEquals(InterpreterResult.Type.TEXT, out.getType()); + assertEquals("Text\n", new String(out.toByteArray())); + assertEquals(1, numAppendEvent); + assertEquals(1, numUpdateEvent); + + // change type + out.write("%html\n"); + assertEquals(InterpreterResult.Type.HTML, out.getType()); + assertEquals("", new String(out.toByteArray())); + assertEquals(1, numAppendEvent); + assertEquals(2, numUpdateEvent); + + // none TEXT type output stream does not generate append event + out.write("<div>html</div>\n"); + assertEquals(InterpreterResult.Type.HTML, out.getType()); + assertEquals(1, numAppendEvent); + assertEquals(2, numUpdateEvent); + assertEquals("<div>html</div>\n", new String(out.toByteArray())); + + // change type to text again + out.write("%text hello\n"); + assertEquals(InterpreterResult.Type.TEXT, out.getType()); + assertEquals(2, numAppendEvent); + assertEquals(3, numUpdateEvent); + assertEquals("hello\n", new String(out.toByteArray())); + } + + @Test + public void testType2() throws IOException { + out.write("%html\nHello"); + assertEquals(InterpreterResult.Type.HTML, out.getType()); + } + + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + numAppendEvent++; + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + numUpdateEvent++; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java index 007730a..d7ab9e8 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java @@ -105,4 +105,9 @@ public class InterpreterResultTest { "123\n", result.message()); } + @Test + public void testToString() { + assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 29a1fb1..906878d 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -64,12 +64,13 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { Properties p = new Properties(); intp = new RemoteInterpreter( - p, - MockInterpreterAngular.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterAngular.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intp); @@ -83,7 +84,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>()); + new LinkedList<InterpreterContextRunner>(), null); intp.open(); }
