Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master dbdaf84e4 -> 5ec59a81b


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
new file mode 100644
index 0000000..623a037
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test for remote interpreter output stream
+ */
+public class RemoteInterpreterOutputTestStream implements 
RemoteInterpreterProcessListener {
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+
+  @Before
+  public void setUp() throws Exception {
+    intpGroup = new InterpreterGroup();
+    env = new HashMap<String, String>();
+    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    intpGroup.close();
+    intpGroup.destroy();
+  }
+
+  private RemoteInterpreter createMockInterpreter() {
+    RemoteInterpreter intp = new RemoteInterpreter(
+            new Properties(),
+            MockInterpreterOutputStream.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            this);
+
+
+    intpGroup.add(intp);
+    intp.setInterpreterGroup(intpGroup);
+    return intp;
+  }
+
+  private InterpreterContext createInterpreterContext() {
+    return new InterpreterContext(
+            "noteId",
+            "id",
+            "title",
+            "text",
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LinkedList<InterpreterContextRunner>(), null);
+  }
+
+  @Test
+  public void testInterpreterResultOnly() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS::staticresult", 
createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("staticresult", ret.message());
+
+    ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("staticresult2", ret.message());
+
+    ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals("staticresult3", ret.message());
+  }
+
+  @Test
+  public void testInterpreterOutputStreamOnly() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", 
createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("streamresult", ret.message());
+
+    ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals("streamresult2", ret.message());
+  }
+
+  @Test
+  public void testInterpreterResultOutputStreamMixed() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS:stream:static", 
createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("streamstatic", ret.message());
+  }
+
+  @Test
+  public void testOutputType() {
+    RemoteInterpreter intp = createMockInterpreter();
+
+    InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", 
createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.type());
+    assertEquals("hello", ret.message());
+
+    ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.type());
+    assertEquals("hello", ret.message());
+
+    ret = intp.interpret("SUCCESS:%html hello:%angular world", 
createInterpreterContext());
+    assertEquals(InterpreterResult.Type.ANGULAR, ret.type());
+    assertEquals("helloworld", ret.message());
+  }
+
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, String output) 
{
+
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, String 
output) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
index ea5397e..abee5b8 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -34,7 +34,7 @@ public class RemoteInterpreterProcessTest {
     InterpreterGroup intpGroup = new InterpreterGroup();
     RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
         "../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
-        10 * 1000);
+        10 * 1000, null);
     assertFalse(rip.isRunning());
     assertEquals(0, rip.referenceCount());
     assertEquals(1, rip.reference(intpGroup));

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index c938ff3..034a676 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -63,30 +63,38 @@ public class RemoteInterpreterTest {
     intpGroup.destroy();
   }
 
+  private RemoteInterpreter createMockInterpreterA(Properties p) {
+    return new RemoteInterpreter(
+            p,
+            MockInterpreterA.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null);
+  }
+
+  private RemoteInterpreter createMockInterpreterB(Properties p) {
+    return new RemoteInterpreter(
+            p,
+            MockInterpreterB.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null);
+  }
+
   @Test
   public void testRemoteInterperterCall() throws TTransportException, 
IOException {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
-    RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpB = createMockInterpreterB(p);
 
     intpGroup.add(intpB);
     intpB.setInterpreterGroup(intpGroup);
@@ -113,7 +121,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
 
     intpB.open();
     assertEquals(2, process.referenceCount());
@@ -131,14 +139,7 @@ public class RemoteInterpreterTest {
   public void testRemoteInterperterErrorStatus() throws TTransportException, 
IOException {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -153,7 +154,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
 
     assertEquals(Code.ERROR, ret.code());
   }
@@ -163,24 +164,26 @@ public class RemoteInterpreterTest {
     Properties p = new Properties();
 
     RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterA.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterB.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intpB);
@@ -199,7 +202,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
     assertEquals("500", ret.message());
 
     ret = intpB.interpret("500",
@@ -211,7 +214,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
     assertEquals("1000", ret.message());
     long end = System.currentTimeMillis();
     assertTrue(end - start >= 1000);
@@ -225,26 +228,12 @@ public class RemoteInterpreterTest {
   public void testRemoteSchedulerSharingSubmit() throws TTransportException, 
IOException, InterruptedException {
     Properties p = new Properties();
 
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
-    final RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpB = createMockInterpreterB(p);
 
     intpGroup.add(intpB);
     intpB.setInterpreterGroup(intpGroup);
@@ -276,7 +265,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
-                new LinkedList<InterpreterContextRunner>()));
+                new LinkedList<InterpreterContextRunner>(), null));
       }
 
       @Override
@@ -310,7 +299,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
-                new LinkedList<InterpreterContextRunner>()));
+                new LinkedList<InterpreterContextRunner>(), null));
       }
 
       @Override
@@ -340,14 +329,7 @@ public class RemoteInterpreterTest {
   public void testRunOrderPreserved() throws InterruptedException {
     Properties p = new Properties();
 
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -382,7 +364,7 @@ public class RemoteInterpreterTest {
               new HashMap<String, Object>(),
               new GUI(),
               new AngularObjectRegistry(intpGroup.getId(), null),
-              new LinkedList<InterpreterContextRunner>()));
+              new LinkedList<InterpreterContextRunner>(), null));
 
           synchronized (results) {
             results.add(ret.message());
@@ -421,14 +403,7 @@ public class RemoteInterpreterTest {
     Properties p = new Properties();
     p.put("parallel", "true");
 
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -466,7 +441,7 @@ public class RemoteInterpreterTest {
               new HashMap<String, Object>(),
               new GUI(),
               new AngularObjectRegistry(intpGroup.getId(), null),
-              new LinkedList<InterpreterContextRunner>()));
+              new LinkedList<InterpreterContextRunner>(), null));
 
           synchronized (results) {
             results.add(ret.message());
@@ -501,14 +476,7 @@ public class RemoteInterpreterTest {
   public void testInterpreterGroupResetBeforeProcessStarts() {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpA.setInterpreterGroup(intpGroup);
     RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
@@ -523,14 +491,7 @@ public class RemoteInterpreterTest {
   public void testInterpreterGroupResetAfterProcessFinished() {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpA.setInterpreterGroup(intpGroup);
     RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
@@ -548,14 +509,7 @@ public class RemoteInterpreterTest {
   public void testInterpreterGroupResetDuringProcessRunning() throws 
InterruptedException {
     Properties p = new Properties();
 
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -585,7 +539,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
-                new LinkedList<InterpreterContextRunner>()));
+                new LinkedList<InterpreterContextRunner>(), null));
       }
 
       @Override
@@ -616,26 +570,12 @@ public class RemoteInterpreterTest {
   public void 
testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
-    RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpB = createMockInterpreterB(p);
 
     intpGroup.add(intpB);
     intpB.setInterpreterGroup(intpGroup);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
new file mode 100644
index 0000000..bc1859f
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MockInterpreter to test outputstream
+ */
+public class MockInterpreterOutputStream extends Interpreter {
+  static {
+    Interpreter.register(
+            "interpreterOutputStream",
+            "group1",
+            MockInterpreterA.class.getName(),
+            new InterpreterPropertyBuilder().build());
+
+  }
+
+  private String lastSt;
+
+  public MockInterpreterOutputStream(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    //new RuntimeException().printStackTrace();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public String getLastStatement() {
+    return lastSt;
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] ret = st.split(":");
+    try {
+      if (ret[1] != null) {
+        context.out.write(ret[1]);
+      }
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
+    return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), 
(ret.length > 2) ?
+            ret[2] : "");
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index d17df4f..05bc676 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -64,12 +64,13 @@ public class RemoteSchedulerTest {
     env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
 
     final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterA.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intpA);
@@ -103,7 +104,7 @@ public class RemoteSchedulerTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
         return "1000";
       }
 
@@ -147,12 +148,13 @@ public class RemoteSchedulerTest {
     env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
 
     final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterA.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intpA);
@@ -173,7 +175,7 @@ public class RemoteSchedulerTest {
           new HashMap<String, Object>(),
           new GUI(),
           new AngularObjectRegistry(intpGroup.getId(), null),
-          new LinkedList<InterpreterContextRunner>());
+          new LinkedList<InterpreterContextRunner>(), null);
 
       @Override
       public int progress() {
@@ -209,7 +211,7 @@ public class RemoteSchedulerTest {
           new HashMap<String, Object>(),
           new GUI(),
           new AngularObjectRegistry(intpGroup.getId(), null),
-          new LinkedList<InterpreterContextRunner>());
+          new LinkedList<InterpreterContextRunner>(), null);
 
       @Override
       public int progress() {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 9e7a97c..dff75c7 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -81,7 +81,8 @@ public class ZeppelinServer extends Application {
 
     this.depResolver = new 
DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO));
     this.schedulerFactory = new SchedulerFactory();
-    this.replFactory = new InterpreterFactory(conf, notebookWsServer, 
depResolver);
+    this.replFactory = new InterpreterFactory(conf, notebookWsServer,
+            notebookWsServer, depResolver);
     this.notebookRepo = new NotebookRepoSync(conf);
     this.notebookIndex = new LuceneSearch();
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
index 0142df2..4296e93 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
@@ -93,6 +93,8 @@ public class Message {
 
     PARAGRAPH_REMOVE,
     PARAGRAPH_CLEAR_OUTPUT,
+    PARAGRAPH_APPEND_OUTPUT,  // [s-c] append output
+    PARAGRAPH_UPDATE_OUTPUT,  // [s-c] update (replace) output
     PING,
 
     ANGULAR_OBJECT_UPDATE,  // [s-c] add/update angular object

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 3dfdca3..64698fc 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -30,12 +30,11 @@ import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -57,7 +56,8 @@ import com.google.gson.Gson;
  *
  */
 public class NotebookServer extends WebSocketServlet implements
-        NotebookSocketListener, JobListenerFactory, 
AngularObjectRegistryListener {
+        NotebookSocketListener, JobListenerFactory, 
AngularObjectRegistryListener,
+        RemoteInterpreterProcessListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(NotebookServer.class);
   Gson gson = new Gson();
   final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
@@ -749,14 +749,46 @@ public class NotebookServer extends WebSocketServlet 
implements
   }
 
   /**
+   * This callback is for the paragraph that runs on ZeppelinServer
+   * @param noteId
+   * @param paragraphId
+   * @param output output to append
+   */
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, String output) 
{
+    Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
+            .put("noteId", noteId)
+            .put("paragraphId", paragraphId)
+            .put("data", output);
+    Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
+    broadcast(noteId, msg);
+  }
+
+  /**
+   * This callback is for the paragraph that runs on ZeppelinServer
+   * @param noteId
+   * @param paragraphId
+   * @param output output to update (replace)
+   */
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, String 
output) {
+    Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
+            .put("noteId", noteId)
+            .put("paragraphId", paragraphId)
+            .put("data", output);
+    Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
+    broadcast(noteId, msg);
+  }
+
+  /**
    * Need description here.
    *
    */
-  public static class ParagraphJobListener implements JobListener {
+  public static class ParagraphListenerImpl implements ParagraphJobListener {
     private NotebookServer notebookServer;
     private Note note;
 
-    public ParagraphJobListener(NotebookServer notebookServer, Note note) {
+    public ParagraphListenerImpl(NotebookServer notebookServer, Note note) {
       this.notebookServer = notebookServer;
       this.note = note;
     }
@@ -791,11 +823,43 @@ public class NotebookServer extends WebSocketServlet 
implements
       }
       notebookServer.broadcastNote(note);
     }
+
+    /**
+     * This callback is for praragraph that runs on RemoteInterpreterProcess
+     * @param paragraph
+     * @param out
+     * @param output
+     */
+    @Override
+    public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, 
String output) {
+      Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
+              .put("noteId", paragraph.getNote().getId())
+              .put("paragraphId", paragraph.getId())
+              .put("data", output);
+
+      notebookServer.broadcast(paragraph.getNote().getId(), msg);
+    }
+
+    /**
+     * This callback is for paragraph that runs on RemoteInterpreterProcess
+     * @param paragraph
+     * @param out
+     * @param output
+     */
+    @Override
+    public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, 
String output) {
+      Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
+              .put("noteId", paragraph.getNote().getId())
+              .put("paragraphId", paragraph.getId())
+              .put("data", output);
+
+      notebookServer.broadcast(paragraph.getNote().getId(), msg);
+    }
   }
 
   @Override
-  public JobListener getParagraphJobListener(Note note) {
-    return new ParagraphJobListener(this, note);
+  public ParagraphJobListener getParagraphJobListener(Note note) {
+    return new ParagraphListenerImpl(this, note);
   }
 
   private void sendAllAngularObjects(Note note, NotebookSocket conn) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html 
b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
index 80af4ef..7fb40ac 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
@@ -23,24 +23,22 @@ limitations under the License.
        ng-bind-html="paragraph.result.comment">
   </div>
 
-  <div id="{{paragraph.id}}_text"
+  <div id="p{{paragraph.id}}_text"
        class="text"
-       ng-if="paragraph.result.type == 'TEXT'"
-       ng-bind="paragraph.result.msg">
-  </div>
+       ng-if="getResultType() == 'TEXT'"></div>
 
   <div id="p{{paragraph.id}}_html"
        class="resultContained"
-       ng-if="paragraph.result.type == 'HTML'">
+       ng-if="getResultType() == 'HTML'">
   </div>
 
   <div id="p{{paragraph.id}}_angular"
        class="resultContained"
-       ng-if="paragraph.result.type == 'ANGULAR'">
+       ng-if="getResultType() == 'ANGULAR'">
   </div>
 
   <img id="{{paragraph.id}}_img"
-       ng-if="paragraph.result.type == 'IMG'"
+       ng-if="getResultType() == 'IMG'"
        ng-src="{{getBase64ImageSrc(paragraph.result.msg)}}">
   </img>
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js 
b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index be88498..30c7ea4 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -54,11 +54,13 @@ angular.module('zeppelinWebApp')
       $scope.renderHtml();
     } else if ($scope.getResultType() === 'ANGULAR') {
       $scope.renderAngular();
+    } else if ($scope.getResultType() === 'TEXT') {
+      $scope.renderText();
     }
   };
 
-  $scope.renderHtml = function() {
-    var retryRenderer = function() {
+    $scope.renderHtml = function() {
+      var retryRenderer = function() {
       if (angular.element('#p' + $scope.paragraph.id + '_html').length) {
         try {
           angular.element('#p' + $scope.paragraph.id + 
'_html').html($scope.paragraph.result.msg);
@@ -93,6 +95,42 @@ angular.module('zeppelinWebApp')
     $timeout(retryRenderer);
   };
 
+  $scope.renderText = function() {
+    var retryRenderer = function() {
+
+      var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+      if (textEl.length) {
+        // clear all lines before render
+        $scope.clearTextOutput();
+
+        if ($scope.paragraph.result && $scope.paragraph.result.msg) {
+          $scope.appendTextOutput($scope.paragraph.result.msg);
+        }
+      } else {
+        $timeout(retryRenderer, 10);
+      }
+    };
+    $timeout(retryRenderer);
+  };
+
+  $scope.clearTextOutput = function() {
+    var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+    if (textEl.length) {
+      textEl.children().remove();
+    }
+  };
+
+  $scope.appendTextOutput = function(msg) {
+    var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+    if (textEl.length) {
+      var lines = msg.split('\n');
+      for (var i=0; i < lines.length; i++) {
+        textEl.append(angular.element('<div></div>').text(lines[i]));
+      }
+    }
+  };
+
+
 
   var initializeDefault = function() {
     var config = $scope.paragraph.config;
@@ -156,6 +194,10 @@ angular.module('zeppelinWebApp')
     }
   });
 
+  var isEmpty = function (object) {
+    return !object;
+  };
+
   // TODO: this may have impact on performance when there are many paragraphs 
in a note.
   $scope.$on('updateParagraph', function(event, data) {
     if (data.paragraph.id === $scope.paragraph.id &&
@@ -166,6 +208,7 @@ angular.module('zeppelinWebApp')
          data.paragraph.status !== $scope.paragraph.status ||
          data.paragraph.jobName !== $scope.paragraph.jobName ||
          data.paragraph.title !== $scope.paragraph.title ||
+         isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result) ||
          data.paragraph.errorMessage !== $scope.paragraph.errorMessage ||
          !angular.equals(data.paragraph.settings, $scope.paragraph.settings) ||
          !angular.equals(data.paragraph.config, $scope.paragraph.config))
@@ -175,7 +218,8 @@ angular.module('zeppelinWebApp')
       var newType = $scope.getResultType(data.paragraph);
       var oldGraphMode = $scope.getGraphMode();
       var newGraphMode = $scope.getGraphMode(data.paragraph);
-      var resultRefreshed = (data.paragraph.dateFinished !== 
$scope.paragraph.dateFinished);
+      var resultRefreshed = (data.paragraph.dateFinished !== 
$scope.paragraph.dateFinished) || isEmpty(data.paragraph.result) !== 
isEmpty($scope.paragraph.result);
+
       var statusChanged = (data.paragraph.status !== $scope.paragraph.status);
 
       //console.log("updateParagraph oldData %o, newData %o. type %o -> %o, 
mode %o -> %o", $scope.paragraph, data, oldType, newType, oldGraphMode, 
newGraphMode);
@@ -234,6 +278,8 @@ angular.module('zeppelinWebApp')
         $scope.renderHtml();
       } else if (newType === 'ANGULAR' && resultRefreshed) {
         $scope.renderAngular();
+      } else if (newType === 'TEXT' && resultRefreshed) {
+        $scope.renderText();
       }
 
       if (statusChanged || resultRefreshed) {
@@ -252,6 +298,19 @@ angular.module('zeppelinWebApp')
 
   });
 
+  $scope.$on('appendParagraphOutput', function(event, data) {
+    if ($scope.paragraph.id === data.paragraphId) {
+      $scope.appendTextOutput(data.data);
+    }
+  });
+
+  $scope.$on('updateParagraphOutput', function(event, data) {
+    if ($scope.paragraph.id === data.paragraphId) {
+      $scope.clearTextOutput(data.data);
+      $scope.appendTextOutput(data.data);
+    }
+  });
+
   $scope.isRunning = function() {
     if ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 
'PENDING') {
       return true;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
----------------------------------------------------------------------
diff --git 
a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js 
b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
index bb99d56..800d450 100644
--- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
+++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
@@ -54,6 +54,10 @@ angular.module('zeppelinWebApp').factory('websocketEvents', 
function($rootScope,
       $rootScope.$broadcast('setNoteMenu', data.notes);
     } else if (op === 'PARAGRAPH') {
       $rootScope.$broadcast('updateParagraph', data);
+    } else if (op === 'PARAGRAPH_APPEND_OUTPUT') {
+      $rootScope.$broadcast('appendParagraphOutput', data);
+    } else if (op === 'PARAGRAPH_UPDATE_OUTPUT') {
+      $rootScope.$broadcast('updateParagraphOutput', data);      
     } else if (op === 'PROGRESS') {
       $rootScope.$broadcast('updateProgress', data);
     } else if (op === 'COMPLETION_LIST') {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 4ff0cc3..039d970 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -29,6 +29,7 @@ import 
org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.slf4j.Logger;
@@ -65,25 +66,30 @@ public class InterpreterFactory {
   private InterpreterOption defaultOption;
 
   AngularObjectRegistryListener angularObjectRegistryListener;
+  private final RemoteInterpreterProcessListener 
remoteInterpreterProcessListener;
 
   DependencyResolver depResolver;
 
   public InterpreterFactory(ZeppelinConfiguration conf,
       AngularObjectRegistryListener angularObjectRegistryListener,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
       DependencyResolver depResolver)
       throws InterpreterException, IOException {
-    this(conf, new InterpreterOption(true), angularObjectRegistryListener, 
depResolver);
+    this(conf, new InterpreterOption(true), angularObjectRegistryListener,
+            remoteInterpreterProcessListener, depResolver);
   }
 
 
   public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption 
defaultOption,
       AngularObjectRegistryListener angularObjectRegistryListener,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
       DependencyResolver depResolver)
       throws InterpreterException, IOException {
     this.conf = conf;
     this.defaultOption = defaultOption;
     this.angularObjectRegistryListener = angularObjectRegistryListener;
     this.depResolver = depResolver;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
     String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
     interpreterClassList = replsConf.split(",");
 
@@ -500,7 +506,8 @@ public class InterpreterFactory {
 
   /**
    * Change interpreter property and restart
-   * @param name
+   * @param id
+   * @param option
    * @param properties
    * @throws IOException
    */
@@ -659,7 +666,7 @@ public class InterpreterFactory {
     int connectTimeout = 
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
     LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
         property, className, conf.getInterpreterRemoteRunnerPath(),
-        interpreterPath, connectTimeout));
+        interpreterPath, connectTimeout, remoteInterpreterProcessListener));
     return intp;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
index 5a7e966..1387730 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
@@ -17,11 +17,9 @@
 
 package org.apache.zeppelin.notebook;
 
-import org.apache.zeppelin.scheduler.JobListener;
-
 /**
  * TODO(moon): provide description.
  */
 public interface JobListenerFactory {
-  public JobListener getParagraphJobListener(Note note);
+  public ParagraphJobListener getParagraphJobListener(Note note);
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 392b968..10f080d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -19,39 +19,43 @@ package org.apache.zeppelin.notebook;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.utility.IdHashes;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
 import org.apache.zeppelin.search.SearchService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Binded interpreters for a note
  */
 public class Note implements Serializable, JobListener {
+  static Logger logger = LoggerFactory.getLogger(Note.class);
   private static final long serialVersionUID = 7920699076577612429L;
 
+  // threadpool for delayed persist of note
+  private static final ScheduledThreadPoolExecutor delayedPersistThreadPool =
+          new ScheduledThreadPoolExecutor(0);
+  static {
+    delayedPersistThreadPool.setRemoveOnCancelPolicy(true);
+  }
+
   final List<Paragraph> paragraphs = new LinkedList<>();
+
   private String name = "";
   private String id;
 
@@ -62,6 +66,7 @@ public class Note implements Serializable, JobListener {
   private transient JobListenerFactory jobListenerFactory;
   private transient NotebookRepo repo;
   private transient SearchService index;
+  private transient ScheduledFuture delayedPersist;
 
   /**
    * note configurations.
@@ -144,9 +149,8 @@ public class Note implements Serializable, JobListener {
 
   /**
    * Add paragraph last.
-   *
-   * @param p
    */
+
   public Paragraph addParagraph() {
     Paragraph p = new Paragraph(this, this, replLoader);
     synchronized (paragraphs) {
@@ -187,7 +191,6 @@ public class Note implements Serializable, JobListener {
    * Insert paragraph in given index.
    *
    * @param index
-   * @param p
    */
   public Paragraph insertParagraph(int index) {
     Paragraph p = new Paragraph(this, this, replLoader);
@@ -339,8 +342,6 @@ public class Note implements Serializable, JobListener {
 
   /**
    * Run all paragraphs sequentially.
-   *
-   * @param jobListener
    */
   public void runAll() {
     synchronized (paragraphs) {
@@ -400,15 +401,55 @@ public class Note implements Serializable, JobListener {
   }
 
   public void persist() throws IOException {
+    stopDelayedPersistTimer();
     snapshotAngularObjectRegistry();
     index.updateIndexDoc(this);
     repo.save(this);
   }
 
+  /**
+   * Persist this note with maximum delay.
+   * @param maxDelaySec
+   */
+  public void persist(int maxDelaySec) {
+    startDelayedPersistTimer(maxDelaySec);
+  }
+
   public void unpersist() throws IOException {
     repo.remove(id());
   }
 
+
+  private void startDelayedPersistTimer(int maxDelaySec) {
+    synchronized (this) {
+      if (delayedPersist != null) {
+        return;
+      }
+
+      delayedPersist = delayedPersistThreadPool.schedule(new Runnable() {
+
+        @Override
+        public void run() {
+          try {
+            persist();
+          } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+          }
+        }
+      }, maxDelaySec, TimeUnit.SECONDS);
+    }
+  }
+
+  private void stopDelayedPersistTimer() {
+    synchronized (this) {
+      if (delayedPersist == null) {
+        return;
+      }
+
+      delayedPersist.cancel(false);
+    }
+  }
+
   public Map<String, Object> getConfig() {
     if (config == null) {
       config = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 433095b..65210f5 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.scheduler.JobListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
 
@@ -213,7 +214,29 @@ public class Paragraph extends Job implements 
Serializable, Cloneable {
       if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
         return getReturn();
       }
-      return ret;
+
+      String message = "";
+
+      context.out.flush();
+      InterpreterResult.Type outputType = context.out.getType();
+      byte[] interpreterOutput = context.out.toByteArray();
+      context.out.clear();
+
+      if (interpreterOutput != null && interpreterOutput.length > 0) {
+        message = new String(interpreterOutput);
+      }
+
+      if (message.isEmpty()) {
+        return ret;
+      } else {
+        String interpreterResultMessage = ret.message();
+        if (interpreterResultMessage != null && 
!interpreterResultMessage.isEmpty()) {
+          message += interpreterResultMessage;
+          return new InterpreterResult(ret.code(), ret.type(), message);
+        } else {
+          return new InterpreterResult(ret.code(), outputType, message);
+        }
+      }
     } finally {
       InterpreterContext.remove();
     }
@@ -244,6 +267,7 @@ public class Paragraph extends Job implements Serializable, 
Cloneable {
       runners.add(new ParagraphRunner(note, note.id(), p.getId()));
     }
 
+    final Paragraph self = this;
     InterpreterContext interpreterContext = new InterpreterContext(
             note.id(),
             getId(),
@@ -252,7 +276,34 @@ public class Paragraph extends Job implements 
Serializable, Cloneable {
             this.getConfig(),
             this.settings,
             registry,
-            runners);
+            runners,
+            new InterpreterOutput(new InterpreterOutputListener() {
+              @Override
+              public void onAppend(InterpreterOutput out, byte[] line) {
+                updateParagraphResult(out);
+                ((ParagraphJobListener) getListener()).onOutputAppend(self, 
out, new String(line));
+              }
+
+              @Override
+              public void onUpdate(InterpreterOutput out, byte[] output) {
+                updateParagraphResult(out);
+                ((ParagraphJobListener) getListener()).onOutputUpdate(self, 
out,
+                        new String(output));
+              }
+
+              private void updateParagraphResult(InterpreterOutput out) {
+                // update paragraph result
+                Throwable t = null;
+                String message = null;
+                try {
+                  message = new String(out.toByteArray());
+                } catch (IOException e) {
+                  logger().error(e.getMessage(), e);
+                  t = e;
+                }
+                setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), 
message), t);
+              }
+            }));
     return interpreterContext;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
new file mode 100644
index 0000000..f6404d7
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.scheduler.JobListener;
+
+/**
+ * Listen paragraph update
+ */
+public interface ParagraphJobListener extends JobListener {
+  public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, 
String output);
+  public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, 
String output);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index abd0e3b..17d91cc 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -55,8 +55,8 @@ public class InterpreterFactoryTest {
     System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), 
tmpDir.getAbsolutePath());
     System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), 
"org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
     conf = new ZeppelinConfiguration();
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null);
-    context = new InterpreterContext("note", "id", "title", "text", null, 
null, null, null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null, null);
+    context = new InterpreterContext("note", "id", "title", "text", null, 
null, null, null, null);
 
   }
 
@@ -140,7 +140,7 @@ public class InterpreterFactoryTest {
     factory.add("newsetting", "mock1", new InterpreterOption(false), new 
Properties());
     assertEquals(3, factory.get().size());
 
-    InterpreterFactory factory2 = new InterpreterFactory(conf, null, null);
+    InterpreterFactory factory2 = new InterpreterFactory(conf, null, null, 
null);
     assertEquals(3, factory2.get().size());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
index 9496e4d..4fa8ef6 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
@@ -58,7 +58,7 @@ public class NoteInterpreterLoaderTest {
     MockInterpreter11.register("mock11", "group1", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter11");
     MockInterpreter2.register("mock2", "group2", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter2");
 
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null, null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index e98e680..82ba137 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -38,6 +38,7 @@ import 
org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
@@ -85,7 +86,7 @@ public class NotebookTest implements JobListenerFactory{
     MockInterpreter1.register("mock1", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter1");
     MockInterpreter2.register("mock2", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter2");
 
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null, null);
 
     SearchService search = mock(SearchService.class);
     notebookRepo = new VFSNotebookRepo(conf);
@@ -172,7 +173,8 @@ public class NotebookTest implements JobListenerFactory{
     note.persist();
 
     Notebook notebook2 = new Notebook(
-        conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, 
null, null), this, null);
+        conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, 
null, null, null), this,
+            null);
     assertEquals(1, notebook2.getAllNotes().size());
   }
 
@@ -411,8 +413,16 @@ public class NotebookTest implements JobListenerFactory{
   }
 
   @Override
-  public JobListener getParagraphJobListener(Note note) {
-    return new JobListener(){
+  public ParagraphJobListener getParagraphJobListener(Note note) {
+    return new ParagraphJobListener(){
+
+      @Override
+      public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, 
String output) {
+      }
+
+      @Override
+      public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, 
String output) {
+      }
 
       @Override
       public void onProgressUpdate(Job job, int progress) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
index 60b3ba3..31970af 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
@@ -30,12 +30,10 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -87,7 +85,7 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     MockInterpreter1.register("mock1", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter1");
     MockInterpreter2.register("mock2", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter2");
 
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null, null);
     
     SearchService search = mock(SearchService.class);
     notebookRepoSync = new NotebookRepoSync(conf);
@@ -224,8 +222,16 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
   }
 
   @Override
-  public JobListener getParagraphJobListener(Note note) {
-    return new JobListener(){
+  public ParagraphJobListener getParagraphJobListener(Note note) {
+    return new ParagraphJobListener(){
+
+      @Override
+      public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, 
String output) {
+      }
+
+      @Override
+      public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, 
String output) {
+      }
 
       @Override
       public void onProgressUpdate(Job job, int progress) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
index cff086d..2e2801c 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
@@ -30,10 +30,7 @@ import 
org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.scheduler.JobListener;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.apache.zeppelin.search.SearchService;
@@ -76,7 +73,7 @@ public class VFSNotebookRepoTest implements 
JobListenerFactory {
     MockInterpreter1.register("mock1", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter1");
 
     this.schedulerFactory = new SchedulerFactory();
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null, null);
 
     SearchService search = mock(SearchService.class);
     notebookRepo = new VFSNotebookRepo(conf);
@@ -140,7 +137,7 @@ public class VFSNotebookRepoTest implements 
JobListenerFactory {
   }
 
   @Override
-  public JobListener getParagraphJobListener(Note note) {
+  public ParagraphJobListener getParagraphJobListener(Note note) {
     return null;
   }
 }

Reply via email to