Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 7d133b619 -> 0fde27fda


ZEPPELIN-331 Don't update back the browser where updated the angular object

This PR fixes problem described in
https://issues.apache.org/jira/browse/ZEPPELIN-331

By excluding the webbrowser who created the angular object update event from 
broadcast target.

* [x] exclude from broadcast
* [x] add unittest

Author: Lee moon soo <[email protected]>

Closes #341 from Leemoonsoo/ZEPPELIN-331 and squashes the following commits:

40f8f84 [Lee moon soo] Change log level to debug for SEND message
cd45a7f [Lee moon soo] Fix test
8175c8d [Lee moon soo] Add mock interpreter
527c56f [Lee moon soo] Add test for broadcast for angularObjectUpdate
4c4ce6d [Lee moon soo] Don't update back to the browser where update the 
angular object


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/0fde27fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/0fde27fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/0fde27fd

Branch: refs/heads/master
Commit: 0fde27fda6fdc4c76165925cd3060a59eace1c8a
Parents: 7d133b6
Author: Lee moon soo <[email protected]>
Authored: Sat Nov 14 20:39:54 2015 +0900
Committer: Lee moon soo <[email protected]>
Committed: Sun Nov 15 15:41:55 2015 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/socket/NotebookServer.java  |  32 +++++-
 .../interpreter/mock/MockInterpreter1.java      |  73 +++++++++++++
 .../zeppelin/socket/NotebookServerTest.java     | 102 ++++++++++++++++++-
 3 files changed, 201 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0fde27fd/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index f295a8e..e987461 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -264,6 +264,26 @@ public class NotebookServer extends WebSocketServlet 
implements
     }
   }
 
+  private void broadcastExcept(String noteId, Message m, NotebookSocket 
exclude) {
+    synchronized (noteSocketMap) {
+      List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
+      if (socketLists == null || socketLists.size() == 0) {
+        return;
+      }
+      LOG.debug("SEND >> " + m.op);
+      for (NotebookSocket conn : socketLists) {
+        if (exclude.equals(conn)) {
+          continue;
+        }
+        try {
+          conn.send(serializeMessage(m));
+        } catch (IOException e) {
+          LOG.error("socket error", e);
+        }
+      }
+    }
+  }
+
   private void broadcastAll(Message m) {
     synchronized (connectedSockets) {
       for (NotebookSocket conn : connectedSockets) {
@@ -498,7 +518,7 @@ public class NotebookServer extends WebSocketServlet 
implements
    * @param notebook the notebook.
    * @param fromMessage the message.
    */
-  private void angularObjectUpdated(WebSocket conn, Notebook notebook,
+  private void angularObjectUpdated(NotebookSocket conn, Notebook notebook,
       Message fromMessage) {
     String noteId = (String) fromMessage.get("noteId");
     String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
@@ -552,20 +572,22 @@ public class NotebookServer extends WebSocketServlet 
implements
           if 
(interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
             AngularObjectRegistry angularObjectRegistry = setting
                 .getInterpreterGroup().getAngularObjectRegistry();
-            this.broadcast(
+            this.broadcastExcept(
                 n.id(),
                 new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
                     .put("interpreterGroupId", interpreterGroupId)
-                    .put("noteId", n.id()));
+                    .put("noteId", n.id()),
+                conn);
           }
         }
       }
     } else { // broadcast to all web session for the note
-      this.broadcast(
+      this.broadcastExcept(
           note.id(),
           new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
               .put("interpreterGroupId", interpreterGroupId)
-              .put("noteId", note.id()));
+              .put("noteId", note.id()),
+          conn);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0fde27fd/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
new file mode 100644
index 0000000..b76a8b2
--- /dev/null
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.mock;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+public class MockInterpreter1 extends Interpreter{
+  Map<String, Object> vars = new HashMap<String, Object>();
+
+  public MockInterpreter1(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode());
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0fde27fd/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index c17809a..5275d81 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -20,16 +20,53 @@
 package org.apache.zeppelin.socket;
 
 import static org.junit.Assert.*;
+
 import java.io.IOException;
+
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.server.ZeppelinServer;
+import org.apache.zeppelin.socket.Message.OP;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.gson.Gson;
+
 import java.net.UnknownHostException;
 import java.net.InetAddress;
+import java.util.List;
+
+import javax.servlet.http.HttpServletRequest;
+
+import static org.mockito.Mockito.*;
+
 
 /**
  * BASIC Zeppelin rest api tests
  */
-public class NotebookServerTest {
+public class NotebookServerTest extends AbstractTestRestApi {
+
+
+  private static Notebook notebook;
+  private static NotebookServer notebookServer;
+  private static Gson gson;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    AbstractTestRestApi.startUp();
+    gson = new Gson();
+    notebook = ZeppelinServer.notebook;
+    notebookServer = ZeppelinServer.notebookServer;
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    AbstractTestRestApi.shutDown();
+  }
 
   @Test
   public void checkOrigin() throws UnknownHostException {
@@ -45,5 +82,68 @@ public class NotebookServerTest {
     NotebookServer server = new NotebookServer();
     assertFalse(server.checkOrigin(new TestHttpServletRequest(), 
"http://evillocalhost:8080";));
   }
+
+  @Test
+  public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() 
throws IOException {
+    // create a notebook
+    Note note1 = notebook.createNote();
+
+    // get reference to interpreterGroup
+    InterpreterGroup interpreterGroup = null;
+    List<InterpreterSetting> settings = 
note1.getNoteReplLoader().getInterpreterSettings();
+    for (InterpreterSetting setting : settings) {
+      if (setting.getInterpreterGroup() == null) {
+        continue;
+      }
+
+      interpreterGroup = setting.getInterpreterGroup();
+      break;
+    }
+
+    // add angularObject
+    interpreterGroup.getAngularObjectRegistry().add("object1", "value1", 
note1.getId());
+
+    // create two sockets and open it
+    NotebookSocket sock1 = createWebSocket();
+    NotebookSocket sock2 = createWebSocket();
+
+    assertEquals(sock1, sock1);
+    assertNotEquals(sock1, sock2);
+
+    notebookServer.onOpen(sock1);
+    notebookServer.onOpen(sock2);
+    verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject
+    // open the same notebook from sockets
+    notebookServer.onMessage(sock1, gson.toJson(new 
Message(OP.GET_NOTE).put("id", note1.getId())));
+    notebookServer.onMessage(sock2, gson.toJson(new 
Message(OP.GET_NOTE).put("id", note1.getId())));
+
+    reset(sock1);
+    reset(sock2);
+
+    // update object from sock1
+    notebookServer.onMessage(sock1, gson.toJson(
+        new Message(OP.ANGULAR_OBJECT_UPDATED)
+        .put("noteId", note1.getId())
+        .put("name", "object1")
+        .put("value", "value1")
+        .put("interpreterGroupId", interpreterGroup.getId())));
+
+
+    // expect object is broadcasted except for where the update is created
+    verify(sock1, times(0)).send(anyString());
+    verify(sock2, times(1)).send(anyString());
+
+    notebook.removeNote(note1.getId());
+  }
+
+  private NotebookSocket createWebSocket() {
+    NotebookSocket sock = mock(NotebookSocket.class);
+    when(sock.getRequest()).thenReturn(createHttpServletRequest());
+    return sock;
+  }
+
+  private HttpServletRequest createHttpServletRequest() {
+    return mock(HttpServletRequest.class);
+  }
 }
 

Reply via email to