Repository: incubator-zeppelin Updated Branches: refs/heads/master 7d133b619 -> 0fde27fda
ZEPPELIN-331 Don't update back the browser where updated the angular object This PR fixes problem described in https://issues.apache.org/jira/browse/ZEPPELIN-331 By excluding the webbrowser who created the angular object update event from broadcast target. * [x] exclude from broadcast * [x] add unittest Author: Lee moon soo <[email protected]> Closes #341 from Leemoonsoo/ZEPPELIN-331 and squashes the following commits: 40f8f84 [Lee moon soo] Change log level to debug for SEND message cd45a7f [Lee moon soo] Fix test 8175c8d [Lee moon soo] Add mock interpreter 527c56f [Lee moon soo] Add test for broadcast for angularObjectUpdate 4c4ce6d [Lee moon soo] Don't update back to the browser where update the angular object Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/0fde27fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/0fde27fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/0fde27fd Branch: refs/heads/master Commit: 0fde27fda6fdc4c76165925cd3060a59eace1c8a Parents: 7d133b6 Author: Lee moon soo <[email protected]> Authored: Sat Nov 14 20:39:54 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Sun Nov 15 15:41:55 2015 +0900 ---------------------------------------------------------------------- .../apache/zeppelin/socket/NotebookServer.java | 32 +++++- .../interpreter/mock/MockInterpreter1.java | 73 +++++++++++++ .../zeppelin/socket/NotebookServerTest.java | 102 ++++++++++++++++++- 3 files changed, 201 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0fde27fd/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index f295a8e..e987461 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -264,6 +264,26 @@ public class NotebookServer extends WebSocketServlet implements } } + private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) { + synchronized (noteSocketMap) { + List<NotebookSocket> socketLists = noteSocketMap.get(noteId); + if (socketLists == null || socketLists.size() == 0) { + return; + } + LOG.debug("SEND >> " + m.op); + for (NotebookSocket conn : socketLists) { + if (exclude.equals(conn)) { + continue; + } + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } + } + } + } + private void broadcastAll(Message m) { synchronized (connectedSockets) { for (NotebookSocket conn : connectedSockets) { @@ -498,7 +518,7 @@ public class NotebookServer extends WebSocketServlet implements * @param notebook the notebook. * @param fromMessage the message. */ - private void angularObjectUpdated(WebSocket conn, Notebook notebook, + private void angularObjectUpdated(NotebookSocket conn, Notebook notebook, Message fromMessage) { String noteId = (String) fromMessage.get("noteId"); String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); @@ -552,20 +572,22 @@ public class NotebookServer extends WebSocketServlet implements if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting .getInterpreterGroup().getAngularObjectRegistry(); - this.broadcast( + this.broadcastExcept( n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.id())); + .put("noteId", n.id()), + conn); } } } } else { // broadcast to all web session for the note - this.broadcast( + this.broadcastExcept( note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + .put("noteId", note.id()), + conn); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0fde27fd/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java new file mode 100644 index 0000000..b76a8b2 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.mock; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +public class MockInterpreter1 extends Interpreter{ + Map<String, Object> vars = new HashMap<String, Object>(); + + public MockInterpreter1(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode()); + } + + @Override + public List<String> completion(String buf, int cursor) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0fde27fd/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index c17809a..5275d81 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -20,16 +20,53 @@ package org.apache.zeppelin.socket; import static org.junit.Assert.*; + import java.io.IOException; + +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.rest.AbstractTestRestApi; +import org.apache.zeppelin.server.ZeppelinServer; +import org.apache.zeppelin.socket.Message.OP; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import com.google.gson.Gson; + import java.net.UnknownHostException; import java.net.InetAddress; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; + +import static org.mockito.Mockito.*; + /** * BASIC Zeppelin rest api tests */ -public class NotebookServerTest { +public class NotebookServerTest extends AbstractTestRestApi { + + + private static Notebook notebook; + private static NotebookServer notebookServer; + private static Gson gson; + + @BeforeClass + public static void init() throws Exception { + AbstractTestRestApi.startUp(); + gson = new Gson(); + notebook = ZeppelinServer.notebook; + notebookServer = ZeppelinServer.notebookServer; + } + + @AfterClass + public static void destroy() throws Exception { + AbstractTestRestApi.shutDown(); + } @Test public void checkOrigin() throws UnknownHostException { @@ -45,5 +82,68 @@ public class NotebookServerTest { NotebookServer server = new NotebookServer(); assertFalse(server.checkOrigin(new TestHttpServletRequest(), "http://evillocalhost:8080")); } + + @Test + public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() throws IOException { + // create a notebook + Note note1 = notebook.createNote(); + + // get reference to interpreterGroup + InterpreterGroup interpreterGroup = null; + List<InterpreterSetting> settings = note1.getNoteReplLoader().getInterpreterSettings(); + for (InterpreterSetting setting : settings) { + if (setting.getInterpreterGroup() == null) { + continue; + } + + interpreterGroup = setting.getInterpreterGroup(); + break; + } + + // add angularObject + interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId()); + + // create two sockets and open it + NotebookSocket sock1 = createWebSocket(); + NotebookSocket sock2 = createWebSocket(); + + assertEquals(sock1, sock1); + assertNotEquals(sock1, sock2); + + notebookServer.onOpen(sock1); + notebookServer.onOpen(sock2); + verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject + // open the same notebook from sockets + notebookServer.onMessage(sock1, gson.toJson(new Message(OP.GET_NOTE).put("id", note1.getId()))); + notebookServer.onMessage(sock2, gson.toJson(new Message(OP.GET_NOTE).put("id", note1.getId()))); + + reset(sock1); + reset(sock2); + + // update object from sock1 + notebookServer.onMessage(sock1, gson.toJson( + new Message(OP.ANGULAR_OBJECT_UPDATED) + .put("noteId", note1.getId()) + .put("name", "object1") + .put("value", "value1") + .put("interpreterGroupId", interpreterGroup.getId()))); + + + // expect object is broadcasted except for where the update is created + verify(sock1, times(0)).send(anyString()); + verify(sock2, times(1)).send(anyString()); + + notebook.removeNote(note1.getId()); + } + + private NotebookSocket createWebSocket() { + NotebookSocket sock = mock(NotebookSocket.class); + when(sock.getRequest()).thenReturn(createHttpServletRequest()); + return sock; + } + + private HttpServletRequest createHttpServletRequest() { + return mock(HttpServletRequest.class); + } }
