Repository: incubator-zeppelin Updated Branches: refs/heads/master dbdaf84e4 -> 5ec59a81b
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java new file mode 100644 index 0000000..623a037 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + + +/** + * Test for remote interpreter output stream + */ +public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener { + private InterpreterGroup intpGroup; + private HashMap<String, String> env; + + @Before + public void setUp() throws Exception { + intpGroup = new InterpreterGroup(); + env = new HashMap<String, String>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + } + + @After + public void tearDown() throws Exception { + intpGroup.close(); + intpGroup.destroy(); + } + + private RemoteInterpreter createMockInterpreter() { + RemoteInterpreter intp = new RemoteInterpreter( + new Properties(), + MockInterpreterOutputStream.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + this); + + + intpGroup.add(intp); + intp.setInterpreterGroup(intpGroup); + return intp; + } + + private InterpreterContext createInterpreterContext() { + return new InterpreterContext( + "noteId", + "id", + "title", + "text", + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>(), null); + } + + @Test + public void testInterpreterResultOnly() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("staticresult", ret.message()); + + ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("staticresult2", ret.message()); + + ret = intp.interpret("ERROR::staticresult3", createInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, ret.code()); + assertEquals("staticresult3", ret.message()); + } + + @Test + public void testInterpreterOutputStreamOnly() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("streamresult", ret.message()); + + ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, ret.code()); + assertEquals("streamresult2", ret.message()); + } + + @Test + public void testInterpreterResultOutputStreamMixed() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("streamstatic", ret.message()); + } + + @Test + public void testOutputType() { + RemoteInterpreter intp = createMockInterpreter(); + + InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.type()); + assertEquals("hello", ret.message()); + + ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.type()); + assertEquals("hello", ret.message()); + + ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext()); + assertEquals(InterpreterResult.Type.ANGULAR, ret.type()); + assertEquals("helloworld", ret.message()); + } + + @Override + public void onOutputAppend(String noteId, String paragraphId, String output) { + + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, String output) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java index ea5397e..abee5b8 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -34,7 +34,7 @@ public class RemoteInterpreterProcessTest { InterpreterGroup intpGroup = new InterpreterGroup(); RemoteInterpreterProcess rip = new RemoteInterpreterProcess( "../bin/interpreter.sh", "nonexists", new HashMap<String, String>(), - 10 * 1000); + 10 * 1000, null); assertFalse(rip.isRunning()); assertEquals(0, rip.referenceCount()); assertEquals(1, rip.reference(intpGroup)); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index c938ff3..034a676 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -63,30 +63,38 @@ public class RemoteInterpreterTest { intpGroup.destroy(); } + private RemoteInterpreter createMockInterpreterA(Properties p) { + return new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null); + } + + private RemoteInterpreter createMockInterpreterB(Properties p) { + return new RemoteInterpreter( + p, + MockInterpreterB.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null); + } + @Test public void testRemoteInterperterCall() throws TTransportException, IOException { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); - RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpB = createMockInterpreterB(p); intpGroup.add(intpB); intpB.setInterpreterGroup(intpGroup); @@ -113,7 +121,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); intpB.open(); assertEquals(2, process.referenceCount()); @@ -131,14 +139,7 @@ public class RemoteInterpreterTest { public void testRemoteInterperterErrorStatus() throws TTransportException, IOException { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -153,7 +154,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); assertEquals(Code.ERROR, ret.code()); } @@ -163,24 +164,26 @@ public class RemoteInterpreterTest { Properties p = new Properties(); RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterB.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intpB); @@ -199,7 +202,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); assertEquals("500", ret.message()); ret = intpB.interpret("500", @@ -211,7 +214,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); assertEquals("1000", ret.message()); long end = System.currentTimeMillis(); assertTrue(end - start >= 1000); @@ -225,26 +228,12 @@ public class RemoteInterpreterTest { public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException { Properties p = new Properties(); - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); - final RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpB = createMockInterpreterB(p); intpGroup.add(intpB); intpB.setInterpreterGroup(intpGroup); @@ -276,7 +265,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); } @Override @@ -310,7 +299,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); } @Override @@ -340,14 +329,7 @@ public class RemoteInterpreterTest { public void testRunOrderPreserved() throws InterruptedException { Properties p = new Properties(); - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -382,7 +364,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); synchronized (results) { results.add(ret.message()); @@ -421,14 +403,7 @@ public class RemoteInterpreterTest { Properties p = new Properties(); p.put("parallel", "true"); - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -466,7 +441,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); synchronized (results) { results.add(ret.message()); @@ -501,14 +476,7 @@ public class RemoteInterpreterTest { public void testInterpreterGroupResetBeforeProcessStarts() { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpA.setInterpreterGroup(intpGroup); RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); @@ -523,14 +491,7 @@ public class RemoteInterpreterTest { public void testInterpreterGroupResetAfterProcessFinished() { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpA.setInterpreterGroup(intpGroup); RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); @@ -548,14 +509,7 @@ public class RemoteInterpreterTest { public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException { Properties p = new Properties(); - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -585,7 +539,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); } @Override @@ -616,26 +570,12 @@ public class RemoteInterpreterTest { public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); - RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpB = createMockInterpreterB(p); intpGroup.add(intpB); intpB.setInterpreterGroup(intpGroup); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java new file mode 100644 index 0000000..bc1859f --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * MockInterpreter to test outputstream + */ +public class MockInterpreterOutputStream extends Interpreter { + static { + Interpreter.register( + "interpreterOutputStream", + "group1", + MockInterpreterA.class.getName(), + new InterpreterPropertyBuilder().build()); + + } + + private String lastSt; + + public MockInterpreterOutputStream(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + public String getLastStatement() { + return lastSt; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] ret = st.split(":"); + try { + if (ret[1] != null) { + context.out.write(ret[1]); + } + } catch (IOException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ? + ret[2] : ""); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<String> completion(String buf, int cursor) { + return null; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d17df4f..05bc676 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -64,12 +64,13 @@ public class RemoteSchedulerTest { env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intpA); @@ -103,7 +104,7 @@ public class RemoteSchedulerTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>())); + new LinkedList<InterpreterContextRunner>(), null)); return "1000"; } @@ -147,12 +148,13 @@ public class RemoteSchedulerTest { env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intpA); @@ -173,7 +175,7 @@ public class RemoteSchedulerTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>()); + new LinkedList<InterpreterContextRunner>(), null); @Override public int progress() { @@ -209,7 +211,7 @@ public class RemoteSchedulerTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>()); + new LinkedList<InterpreterContextRunner>(), null); @Override public int progress() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 9e7a97c..dff75c7 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -81,7 +81,8 @@ public class ZeppelinServer extends Application { this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO)); this.schedulerFactory = new SchedulerFactory(); - this.replFactory = new InterpreterFactory(conf, notebookWsServer, depResolver); + this.replFactory = new InterpreterFactory(conf, notebookWsServer, + notebookWsServer, depResolver); this.notebookRepo = new NotebookRepoSync(conf); this.notebookIndex = new LuceneSearch(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java index 0142df2..4296e93 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java @@ -93,6 +93,8 @@ public class Message { PARAGRAPH_REMOVE, PARAGRAPH_CLEAR_OUTPUT, + PARAGRAPH_APPEND_OUTPUT, // [s-c] append output + PARAGRAPH_UPDATE_OUTPUT, // [s-c] update (replace) output PING, ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 3dfdca3..64698fc 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -30,12 +30,11 @@ import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.apache.zeppelin.notebook.JobListenerFactory; -import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -57,7 +56,8 @@ import com.google.gson.Gson; * */ public class NotebookServer extends WebSocketServlet implements - NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { + NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener, + RemoteInterpreterProcessListener { private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); Gson gson = new Gson(); final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>(); @@ -749,14 +749,46 @@ public class NotebookServer extends WebSocketServlet implements } /** + * This callback is for the paragraph that runs on ZeppelinServer + * @param noteId + * @param paragraphId + * @param output output to append + */ + @Override + public void onOutputAppend(String noteId, String paragraphId, String output) { + Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT) + .put("noteId", noteId) + .put("paragraphId", paragraphId) + .put("data", output); + Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId); + broadcast(noteId, msg); + } + + /** + * This callback is for the paragraph that runs on ZeppelinServer + * @param noteId + * @param paragraphId + * @param output output to update (replace) + */ + @Override + public void onOutputUpdated(String noteId, String paragraphId, String output) { + Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT) + .put("noteId", noteId) + .put("paragraphId", paragraphId) + .put("data", output); + Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId); + broadcast(noteId, msg); + } + + /** * Need description here. * */ - public static class ParagraphJobListener implements JobListener { + public static class ParagraphListenerImpl implements ParagraphJobListener { private NotebookServer notebookServer; private Note note; - public ParagraphJobListener(NotebookServer notebookServer, Note note) { + public ParagraphListenerImpl(NotebookServer notebookServer, Note note) { this.notebookServer = notebookServer; this.note = note; } @@ -791,11 +823,43 @@ public class NotebookServer extends WebSocketServlet implements } notebookServer.broadcastNote(note); } + + /** + * This callback is for praragraph that runs on RemoteInterpreterProcess + * @param paragraph + * @param out + * @param output + */ + @Override + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) { + Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT) + .put("noteId", paragraph.getNote().getId()) + .put("paragraphId", paragraph.getId()) + .put("data", output); + + notebookServer.broadcast(paragraph.getNote().getId(), msg); + } + + /** + * This callback is for paragraph that runs on RemoteInterpreterProcess + * @param paragraph + * @param out + * @param output + */ + @Override + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) { + Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT) + .put("noteId", paragraph.getNote().getId()) + .put("paragraphId", paragraph.getId()) + .put("data", output); + + notebookServer.broadcast(paragraph.getNote().getId(), msg); + } } @Override - public JobListener getParagraphJobListener(Note note) { - return new ParagraphJobListener(this, note); + public ParagraphJobListener getParagraphJobListener(Note note) { + return new ParagraphListenerImpl(this, note); } private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html index 80af4ef..7fb40ac 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html @@ -23,24 +23,22 @@ limitations under the License. ng-bind-html="paragraph.result.comment"> </div> - <div id="{{paragraph.id}}_text" + <div id="p{{paragraph.id}}_text" class="text" - ng-if="paragraph.result.type == 'TEXT'" - ng-bind="paragraph.result.msg"> - </div> + ng-if="getResultType() == 'TEXT'"></div> <div id="p{{paragraph.id}}_html" class="resultContained" - ng-if="paragraph.result.type == 'HTML'"> + ng-if="getResultType() == 'HTML'"> </div> <div id="p{{paragraph.id}}_angular" class="resultContained" - ng-if="paragraph.result.type == 'ANGULAR'"> + ng-if="getResultType() == 'ANGULAR'"> </div> <img id="{{paragraph.id}}_img" - ng-if="paragraph.result.type == 'IMG'" + ng-if="getResultType() == 'IMG'" ng-src="{{getBase64ImageSrc(paragraph.result.msg)}}"> </img> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index be88498..30c7ea4 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -54,11 +54,13 @@ angular.module('zeppelinWebApp') $scope.renderHtml(); } else if ($scope.getResultType() === 'ANGULAR') { $scope.renderAngular(); + } else if ($scope.getResultType() === 'TEXT') { + $scope.renderText(); } }; - $scope.renderHtml = function() { - var retryRenderer = function() { + $scope.renderHtml = function() { + var retryRenderer = function() { if (angular.element('#p' + $scope.paragraph.id + '_html').length) { try { angular.element('#p' + $scope.paragraph.id + '_html').html($scope.paragraph.result.msg); @@ -93,6 +95,42 @@ angular.module('zeppelinWebApp') $timeout(retryRenderer); }; + $scope.renderText = function() { + var retryRenderer = function() { + + var textEl = angular.element('#p' + $scope.paragraph.id + '_text'); + if (textEl.length) { + // clear all lines before render + $scope.clearTextOutput(); + + if ($scope.paragraph.result && $scope.paragraph.result.msg) { + $scope.appendTextOutput($scope.paragraph.result.msg); + } + } else { + $timeout(retryRenderer, 10); + } + }; + $timeout(retryRenderer); + }; + + $scope.clearTextOutput = function() { + var textEl = angular.element('#p' + $scope.paragraph.id + '_text'); + if (textEl.length) { + textEl.children().remove(); + } + }; + + $scope.appendTextOutput = function(msg) { + var textEl = angular.element('#p' + $scope.paragraph.id + '_text'); + if (textEl.length) { + var lines = msg.split('\n'); + for (var i=0; i < lines.length; i++) { + textEl.append(angular.element('<div></div>').text(lines[i])); + } + } + }; + + var initializeDefault = function() { var config = $scope.paragraph.config; @@ -156,6 +194,10 @@ angular.module('zeppelinWebApp') } }); + var isEmpty = function (object) { + return !object; + }; + // TODO: this may have impact on performance when there are many paragraphs in a note. $scope.$on('updateParagraph', function(event, data) { if (data.paragraph.id === $scope.paragraph.id && @@ -166,6 +208,7 @@ angular.module('zeppelinWebApp') data.paragraph.status !== $scope.paragraph.status || data.paragraph.jobName !== $scope.paragraph.jobName || data.paragraph.title !== $scope.paragraph.title || + isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result) || data.paragraph.errorMessage !== $scope.paragraph.errorMessage || !angular.equals(data.paragraph.settings, $scope.paragraph.settings) || !angular.equals(data.paragraph.config, $scope.paragraph.config)) @@ -175,7 +218,8 @@ angular.module('zeppelinWebApp') var newType = $scope.getResultType(data.paragraph); var oldGraphMode = $scope.getGraphMode(); var newGraphMode = $scope.getGraphMode(data.paragraph); - var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished); + var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished) || isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result); + var statusChanged = (data.paragraph.status !== $scope.paragraph.status); //console.log("updateParagraph oldData %o, newData %o. type %o -> %o, mode %o -> %o", $scope.paragraph, data, oldType, newType, oldGraphMode, newGraphMode); @@ -234,6 +278,8 @@ angular.module('zeppelinWebApp') $scope.renderHtml(); } else if (newType === 'ANGULAR' && resultRefreshed) { $scope.renderAngular(); + } else if (newType === 'TEXT' && resultRefreshed) { + $scope.renderText(); } if (statusChanged || resultRefreshed) { @@ -252,6 +298,19 @@ angular.module('zeppelinWebApp') }); + $scope.$on('appendParagraphOutput', function(event, data) { + if ($scope.paragraph.id === data.paragraphId) { + $scope.appendTextOutput(data.data); + } + }); + + $scope.$on('updateParagraphOutput', function(event, data) { + if ($scope.paragraph.id === data.paragraphId) { + $scope.clearTextOutput(data.data); + $scope.appendTextOutput(data.data); + } + }); + $scope.isRunning = function() { if ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING') { return true; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index bb99d56..800d450 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -54,6 +54,10 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $rootScope.$broadcast('setNoteMenu', data.notes); } else if (op === 'PARAGRAPH') { $rootScope.$broadcast('updateParagraph', data); + } else if (op === 'PARAGRAPH_APPEND_OUTPUT') { + $rootScope.$broadcast('appendParagraphOutput', data); + } else if (op === 'PARAGRAPH_UPDATE_OUTPUT') { + $rootScope.$broadcast('updateParagraphOutput', data); } else if (op === 'PROGRESS') { $rootScope.$broadcast('updateProgress', data); } else if (op === 'COMPLETION_LIST') { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 4ff0cc3..039d970 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; @@ -65,25 +66,30 @@ public class InterpreterFactory { private InterpreterOption defaultOption; AngularObjectRegistryListener angularObjectRegistryListener; + private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; DependencyResolver depResolver; public InterpreterFactory(ZeppelinConfiguration conf, AngularObjectRegistryListener angularObjectRegistryListener, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, DependencyResolver depResolver) throws InterpreterException, IOException { - this(conf, new InterpreterOption(true), angularObjectRegistryListener, depResolver); + this(conf, new InterpreterOption(true), angularObjectRegistryListener, + remoteInterpreterProcessListener, depResolver); } public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption, AngularObjectRegistryListener angularObjectRegistryListener, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, DependencyResolver depResolver) throws InterpreterException, IOException { this.conf = conf; this.defaultOption = defaultOption; this.angularObjectRegistryListener = angularObjectRegistryListener; this.depResolver = depResolver; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS); interpreterClassList = replsConf.split(","); @@ -500,7 +506,8 @@ public class InterpreterFactory { /** * Change interpreter property and restart - * @param name + * @param id + * @param option * @param properties * @throws IOException */ @@ -659,7 +666,7 @@ public class InterpreterFactory { int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter( property, className, conf.getInterpreterRemoteRunnerPath(), - interpreterPath, connectTimeout)); + interpreterPath, connectTimeout, remoteInterpreterProcessListener)); return intp; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java index 5a7e966..1387730 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java @@ -17,11 +17,9 @@ package org.apache.zeppelin.notebook; -import org.apache.zeppelin.scheduler.JobListener; - /** * TODO(moon): provide description. */ public interface JobListenerFactory { - public JobListener getParagraphJobListener(Note note); + public ParagraphJobListener getParagraphJobListener(Note note); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 392b968..10f080d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -19,39 +19,43 @@ package org.apache.zeppelin.notebook; import java.io.IOException; import java.io.Serializable; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.Input; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.utility.IdHashes; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.search.SearchService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Binded interpreters for a note */ public class Note implements Serializable, JobListener { + static Logger logger = LoggerFactory.getLogger(Note.class); private static final long serialVersionUID = 7920699076577612429L; + // threadpool for delayed persist of note + private static final ScheduledThreadPoolExecutor delayedPersistThreadPool = + new ScheduledThreadPoolExecutor(0); + static { + delayedPersistThreadPool.setRemoveOnCancelPolicy(true); + } + final List<Paragraph> paragraphs = new LinkedList<>(); + private String name = ""; private String id; @@ -62,6 +66,7 @@ public class Note implements Serializable, JobListener { private transient JobListenerFactory jobListenerFactory; private transient NotebookRepo repo; private transient SearchService index; + private transient ScheduledFuture delayedPersist; /** * note configurations. @@ -144,9 +149,8 @@ public class Note implements Serializable, JobListener { /** * Add paragraph last. - * - * @param p */ + public Paragraph addParagraph() { Paragraph p = new Paragraph(this, this, replLoader); synchronized (paragraphs) { @@ -187,7 +191,6 @@ public class Note implements Serializable, JobListener { * Insert paragraph in given index. * * @param index - * @param p */ public Paragraph insertParagraph(int index) { Paragraph p = new Paragraph(this, this, replLoader); @@ -339,8 +342,6 @@ public class Note implements Serializable, JobListener { /** * Run all paragraphs sequentially. - * - * @param jobListener */ public void runAll() { synchronized (paragraphs) { @@ -400,15 +401,55 @@ public class Note implements Serializable, JobListener { } public void persist() throws IOException { + stopDelayedPersistTimer(); snapshotAngularObjectRegistry(); index.updateIndexDoc(this); repo.save(this); } + /** + * Persist this note with maximum delay. + * @param maxDelaySec + */ + public void persist(int maxDelaySec) { + startDelayedPersistTimer(maxDelaySec); + } + public void unpersist() throws IOException { repo.remove(id()); } + + private void startDelayedPersistTimer(int maxDelaySec) { + synchronized (this) { + if (delayedPersist != null) { + return; + } + + delayedPersist = delayedPersistThreadPool.schedule(new Runnable() { + + @Override + public void run() { + try { + persist(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + }, maxDelaySec, TimeUnit.SECONDS); + } + } + + private void stopDelayedPersistTimer() { + synchronized (this) { + if (delayedPersist == null) { + return; + } + + delayedPersist.cancel(false); + } + } + public Map<String, Object> getConfig() { if (config == null) { config = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 433095b..65210f5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -28,6 +28,7 @@ import org.apache.zeppelin.scheduler.JobListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.*; @@ -213,7 +214,29 @@ public class Paragraph extends Job implements Serializable, Cloneable { if (Code.KEEP_PREVIOUS_RESULT == ret.code()) { return getReturn(); } - return ret; + + String message = ""; + + context.out.flush(); + InterpreterResult.Type outputType = context.out.getType(); + byte[] interpreterOutput = context.out.toByteArray(); + context.out.clear(); + + if (interpreterOutput != null && interpreterOutput.length > 0) { + message = new String(interpreterOutput); + } + + if (message.isEmpty()) { + return ret; + } else { + String interpreterResultMessage = ret.message(); + if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) { + message += interpreterResultMessage; + return new InterpreterResult(ret.code(), ret.type(), message); + } else { + return new InterpreterResult(ret.code(), outputType, message); + } + } } finally { InterpreterContext.remove(); } @@ -244,6 +267,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { runners.add(new ParagraphRunner(note, note.id(), p.getId())); } + final Paragraph self = this; InterpreterContext interpreterContext = new InterpreterContext( note.id(), getId(), @@ -252,7 +276,34 @@ public class Paragraph extends Job implements Serializable, Cloneable { this.getConfig(), this.settings, registry, - runners); + runners, + new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + updateParagraphResult(out); + ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line)); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + updateParagraphResult(out); + ((ParagraphJobListener) getListener()).onOutputUpdate(self, out, + new String(output)); + } + + private void updateParagraphResult(InterpreterOutput out) { + // update paragraph result + Throwable t = null; + String message = null; + try { + message = new String(out.toByteArray()); + } catch (IOException e) { + logger().error(e.getMessage(), e); + t = e; + } + setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t); + } + })); return interpreterContext; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java new file mode 100644 index 0000000..f6404d7 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.scheduler.JobListener; + +/** + * Listen paragraph update + */ +public interface ParagraphJobListener extends JobListener { + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output); + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index abd0e3b..17d91cc 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -55,8 +55,8 @@ public class InterpreterFactoryTest { System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); conf = new ZeppelinConfiguration(); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); - context = new InterpreterContext("note", "id", "title", "text", null, null, null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); + context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null); } @@ -140,7 +140,7 @@ public class InterpreterFactoryTest { factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties()); assertEquals(3, factory.get().size()); - InterpreterFactory factory2 = new InterpreterFactory(conf, null, null); + InterpreterFactory factory2 = new InterpreterFactory(conf, null, null, null); assertEquals(3, factory2.get().size()); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java index 9496e4d..4fa8ef6 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java @@ -58,7 +58,7 @@ public class NoteInterpreterLoaderTest { MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11"); MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); } @After http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index e98e680..82ba137 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -38,6 +38,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; @@ -85,7 +86,7 @@ public class NotebookTest implements JobListenerFactory{ MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); @@ -172,7 +173,8 @@ public class NotebookTest implements JobListenerFactory{ note.persist(); Notebook notebook2 = new Notebook( - conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null), this, null); + conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null, null), this, + null); assertEquals(1, notebook2.getAllNotes().size()); } @@ -411,8 +413,16 @@ public class NotebookTest implements JobListenerFactory{ } @Override - public JobListener getParagraphJobListener(Note note) { - return new JobListener(){ + public ParagraphJobListener getParagraphJobListener(Note note) { + return new ParagraphJobListener(){ + + @Override + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) { + } + + @Override + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) { + } @Override public void onProgressUpdate(Job job, int progress) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index 60b3ba3..31970af 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -30,12 +30,10 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; -import org.apache.zeppelin.notebook.JobListenerFactory; -import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -87,7 +85,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory { MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); SearchService search = mock(SearchService.class); notebookRepoSync = new NotebookRepoSync(conf); @@ -224,8 +222,16 @@ public class NotebookRepoSyncTest implements JobListenerFactory { } @Override - public JobListener getParagraphJobListener(Note note) { - return new JobListener(){ + public ParagraphJobListener getParagraphJobListener(Note note) { + return new ParagraphJobListener(){ + + @Override + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) { + } + + @Override + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) { + } @Override public void onProgressUpdate(Job job, int progress) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java index cff086d..2e2801c 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java @@ -30,10 +30,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; -import org.apache.zeppelin.notebook.JobListenerFactory; -import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.search.SearchService; @@ -76,7 +73,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory { MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); this.schedulerFactory = new SchedulerFactory(); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); @@ -140,7 +137,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory { } @Override - public JobListener getParagraphJobListener(Note note) { + public ParagraphJobListener getParagraphJobListener(Note note) { return null; } }
