Repository: incubator-zeppelin Updated Branches: refs/heads/master 2714d28b5 -> 882cdead7
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 65fd0a7..5cd14a2 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -67,7 +67,8 @@ service RemoteInterpreterService { string getStatus(1:string jobId); RemoteInterpreterEvent getEvent(); - void angularObjectUpdate(1: string name, 2: string noteId, 3: string object); - void angularObjectAdd(1: string name, 2: string noteId, 3: string object); - void angularObjectRemove(1: string name, 2: string noteId); + void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string + object); + void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object); + void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java index 43aca62..2d0436f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.display; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import java.util.concurrent.atomic.AtomicInteger; @@ -45,32 +47,68 @@ public class AngularObjectRegistryTest { } @Override - public void onRemove(String interpreterGroupId, String name, String noteId) { + public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { onRemove.incrementAndGet(); } }); - registry.add("name1", "value1", "note1"); - assertEquals(1, registry.getAll("note1").size()); + registry.add("name1", "value1", "note1", null); + assertEquals(1, registry.getAll("note1", null).size()); assertEquals(1, onAdd.get()); assertEquals(0, onUpdate.get()); - registry.get("name1", "note1").set("newValue"); + registry.get("name1", "note1", null).set("newValue"); assertEquals(1, onUpdate.get()); - registry.remove("name1", "note1"); - assertEquals(0, registry.getAll("note1").size()); + registry.remove("name1", "note1", null); + assertEquals(0, registry.getAll("note1", null).size()); assertEquals(1, onRemove.get()); - assertEquals(null, registry.get("name1", "note1")); + assertEquals(null, registry.get("name1", "note1", null)); // namespace - registry.add("name1", "value11", "note2"); - assertEquals("value11", registry.get("name1", "note2").get()); - assertEquals(null, registry.get("name1", "note1")); + registry.add("name1", "value11", "note2", null); + assertEquals("value11", registry.get("name1", "note2", null).get()); + assertEquals(null, registry.get("name1", "note1", null)); // null namespace - registry.add("name1", "global1", null); - assertEquals("global1", registry.get("name1", null).get()); + registry.add("name1", "global1", null, null); + assertEquals("global1", registry.get("name1", null, null).get()); } + + @Test + public void testGetDependOnScope() { + AngularObjectRegistry registry = new AngularObjectRegistry("intpId", null); + AngularObject ao1 = registry.add("name1", "o1", "noteId1", "paragraphId1"); + AngularObject ao2 = registry.add("name2", "o2", "noteId1", "paragraphId1"); + AngularObject ao3 = registry.add("name2", "o3", "noteId1", "paragraphId2"); + AngularObject ao4 = registry.add("name3", "o4", "noteId1", null); + AngularObject ao5 = registry.add("name4", "o5", null, null); + + + assertNull(registry.get("name3", "noteId1", "paragraphId1")); + assertNull(registry.get("name1", "noteId2", null)); + assertEquals("o1", registry.get("name1", "noteId1", "paragraphId1").get()); + assertEquals("o2", registry.get("name2", "noteId1", "paragraphId1").get()); + assertEquals("o3", registry.get("name2", "noteId1", "paragraphId2").get()); + assertEquals("o4", registry.get("name3", "noteId1", null).get()); + assertEquals("o5", registry.get("name4", null, null).get()); + } + + @Test + public void testGetAllDependOnScope() { + AngularObjectRegistry registry = new AngularObjectRegistry("intpId", null); + AngularObject ao1 = registry.add("name1", "o", "noteId1", "paragraphId1"); + AngularObject ao2 = registry.add("name2", "o", "noteId1", "paragraphId1"); + AngularObject ao3 = registry.add("name2", "o", "noteId1", "paragraphId2"); + AngularObject ao4 = registry.add("name3", "o", "noteId1", null); + AngularObject ao5 = registry.add("name4", "o", null, null); + + assertEquals(2, registry.getAll("noteId1", "paragraphId1").size()); + assertEquals(1, registry.getAll("noteId1", "paragraphId2").size()); + assertEquals(1, registry.getAll("noteId1", null).size()); + assertEquals(1, registry.getAll(null, null).size()); + assertEquals(5, registry.getAllWithGlobal("noteId1").size()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java index acb93d0..924c5d4 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.display; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import java.util.concurrent.atomic.AtomicInteger; @@ -27,9 +28,59 @@ import org.junit.Test; public class AngularObjectTest { @Test + public void testEquals() { + assertEquals( + new AngularObject("name", "value", "note1", null, null), + new AngularObject("name", "value", "note1", null, null) + ); + + assertEquals( + new AngularObject("name", "value", "note1", "paragraph1", null), + new AngularObject("name", "value", "note1", "paragraph1", null) + ); + + assertEquals( + new AngularObject("name", "value", null, null, null), + new AngularObject("name", "value", null, null, null) + ); + + assertEquals( + new AngularObject("name", "value1", null, null, null), + new AngularObject("name", "value2", null, null, null) + ); + + assertNotSame( + new AngularObject("name1", "value", null, null, null), + new AngularObject("name2", "value", null, null, null) + ); + + assertNotSame( + new AngularObject("name1", "value", "note1", null, null), + new AngularObject("name2", "value", "note2", null, null) + ); + + assertNotSame( + new AngularObject("name1", "value", "note", null, null), + new AngularObject("name2", "value", null, null, null) + ); + + assertNotSame( + new AngularObject("name", "value", "note", "paragraph1", null), + new AngularObject("name", "value", "note", "paragraph2", null) + ); + + assertNotSame( + new AngularObject("name", "value", "note1", null, null), + new AngularObject("name", "value", "note1", "paragraph1", null) + ); + + + } + + @Test public void testListener() { final AtomicInteger updated = new AtomicInteger(0); - AngularObject ao = new AngularObject("name", "value", "note1", new AngularObjectListener() { + AngularObject ao = new AngularObject("name", "value", "note1", null, new AngularObjectListener() { @Override public void updated(AngularObject updatedObject) { @@ -55,7 +106,7 @@ public class AngularObjectTest { public void testWatcher() throws InterruptedException { final AtomicInteger updated = new AtomicInteger(0); final AtomicInteger onWatch = new AtomicInteger(0); - AngularObject ao = new AngularObject("name", "value", "note1", new AngularObjectListener() { + AngularObject ao = new AngularObject("name", "value", "note1", null, new AngularObjectListener() { @Override public void updated(AngularObject updatedObject) { updated.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 906878d..4cd974d 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -110,7 +110,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { result = ret.message().split(" "); assertEquals("1", result[0]); // size of registry assertEquals("0", result[1]); // num watcher called - assertEquals("v1", localRegistry.get("n1", "note").get()); + assertEquals("v1", localRegistry.get("n1", "note", null).get()); // update object ret = intp.interpret("update n1 v11", context); @@ -118,7 +118,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { Thread.sleep(500); assertEquals("1", result[0]); // size of registry assertEquals("1", result[1]); // num watcher called - assertEquals("v11", localRegistry.get("n1", "note").get()); + assertEquals("v11", localRegistry.get("n1", "note", null).get()); // remove object ret = intp.interpret("remove n1", context); @@ -126,7 +126,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { Thread.sleep(500); assertEquals("0", result[0]); // size of registry assertEquals("1", result[1]); // num watcher called - assertEquals(null, localRegistry.get("n1", "note")); + assertEquals(null, localRegistry.get("n1", "note", null)); } @Test @@ -144,10 +144,10 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { Thread.sleep(500); result = ret.message().split(" "); assertEquals("1", result[0]); // size of registry - assertEquals("v1", localRegistry.get("n1", "note").get()); + assertEquals("v1", localRegistry.get("n1", "note", null).get()); // remove object in local registry. - localRegistry.removeAndNotifyRemoteProcess("n1", "note"); + localRegistry.removeAndNotifyRemoteProcess("n1", "note", null); ret = intp.interpret("get", context); Thread.sleep(500); // waitFor eventpoller pool event result = ret.message().split(" "); @@ -165,7 +165,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { assertEquals("0", result[0]); // size of registry // create object - localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note"); + localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null); // get from remote registry ret = intp.interpret("get", context); @@ -185,7 +185,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { } @Override - public void onRemove(String interpreterGroupId, String name, String noteId) { + public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { onRemove.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java index 7c1a2f0..2f448f2 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java @@ -71,8 +71,9 @@ public class MockInterpreterAngular extends Interpreter { AngularObjectRegistry registry = context.getAngularObjectRegistry(); if (cmd.equals("add")) { - registry.add(name, value, context.getNoteId()); - registry.get(name, context.getNoteId()).addWatcher(new AngularObjectWatcher(null) { + registry.add(name, value, context.getNoteId(), null); + registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher + (null) { @Override public void watch(Object oldObject, Object newObject, @@ -82,9 +83,9 @@ public class MockInterpreterAngular extends Interpreter { }); } else if (cmd.equalsIgnoreCase("update")) { - registry.get(name, context.getNoteId()).set(value); + registry.get(name, context.getNoteId(), null).set(value); } else if (cmd.equals("remove")) { - registry.remove(name, context.getNoteId()); + registry.remove(name, context.getNoteId(), null); } try { @@ -93,7 +94,8 @@ public class MockInterpreterAngular extends Interpreter { logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e); } - String msg = registry.getAll(context.getNoteId()).size() + " " + Integer.toString(numWatch.get()); + String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch + .get()); return new InterpreterResult(Code.SUCCESS, msg); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 11fa7f1..2c8a7c8 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -561,6 +561,7 @@ public class NotebookServer extends WebSocketServlet implements private void angularObjectUpdated(NotebookSocket conn, Notebook notebook, Message fromMessage) { String noteId = (String) fromMessage.get("noteId"); + String paragraphId = (String) fromMessage.get("paragraphId"); String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); String varName = (String) fromMessage.get("name"); Object varValue = fromMessage.get("value"); @@ -579,19 +580,26 @@ public class NotebookServer extends WebSocketServlet implements AngularObjectRegistry angularObjectRegistry = setting .getInterpreterGroup().getAngularObjectRegistry(); // first trying to get local registry - ao = angularObjectRegistry.get(varName, noteId); + ao = angularObjectRegistry.get(varName, noteId, paragraphId); if (ao == null) { - // then try global registry - ao = angularObjectRegistry.get(varName, null); + // then try notebook scope registry + ao = angularObjectRegistry.get(varName, noteId, null); if (ao == null) { - LOG.warn("Object {} is not binded", varName); + // then try global scope registry + ao = angularObjectRegistry.get(varName, null, null); + if (ao == null) { + LOG.warn("Object {} is not binded", varName); + } else { + // path from client -> server + ao.set(varValue, false); + global = true; + } } else { // path from client -> server ao.set(varValue, false); - global = true; + global = false; } } else { - // path from client -> server ao.set(varValue, false); global = false; } @@ -616,7 +624,8 @@ public class NotebookServer extends WebSocketServlet implements n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.id()), + .put("noteId", n.id()) + .put("paragraphId", ao.getParagraphId()), conn); } } @@ -626,7 +635,8 @@ public class NotebookServer extends WebSocketServlet implements note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id()), + .put("noteId", note.id()) + .put("paragraphId", ao.getParagraphId()), conn); } } @@ -837,7 +847,9 @@ public class NotebookServer extends WebSocketServlet implements .put("angularObject", object) .put("interpreterGroupId", intpSetting.getInterpreterGroup().getId()) - .put("noteId", note.id()))); + .put("noteId", note.id()) + .put("paragraphId", object.getParagraphId()) + )); } } } @@ -871,14 +883,15 @@ public class NotebookServer extends WebSocketServlet implements new Message(OP.ANGULAR_OBJECT_UPDATE) .put("angularObject", object) .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + .put("noteId", note.id()) + .put("paragraphId", object.getParagraphId())); } } } } @Override - public void onRemove(String interpreterGroupId, String name, String noteId) { + public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { Notebook notebook = notebook(); List<Note> notes = notebook.getAllNotes(); for (Note note : notes) { @@ -892,7 +905,7 @@ public class NotebookServer extends WebSocketServlet implements broadcast( note.id(), new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put( - "noteId", noteId)); + "noteId", noteId).put("paragraphId", paragraphId)); } } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index 8ec7bdd..7d8f3cf 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -45,8 +45,6 @@ import static org.mockito.Mockito.*; * BASIC Zeppelin rest api tests */ public class NotebookServerTest extends AbstractTestRestApi { - - private static Notebook notebook; private static NotebookServer notebookServer; private static Gson gson; @@ -97,7 +95,7 @@ public class NotebookServerTest extends AbstractTestRestApi { } // add angularObject - interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId()); + interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null); // create two sockets and open it NotebookSocket sock1 = createWebSocket(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-web/src/app/app.controller.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/app.controller.js b/zeppelin-web/src/app/app.controller.js index 2c302b5..ce466a7 100644 --- a/zeppelin-web/src/app/app.controller.js +++ b/zeppelin-web/src/app/app.controller.js @@ -14,7 +14,6 @@ 'use strict'; angular.module('zeppelinWebApp').controller('MainCtrl', function($scope, $rootScope, $window) { - $rootScope.compiledScope = $scope.$new(true, $rootScope); $scope.looknfeel = 'default'; var init = function() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-web/src/app/notebook/notebook.controller.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/notebook/notebook.controller.js b/zeppelin-web/src/app/notebook/notebook.controller.js index e10d725..5919e4e 100644 --- a/zeppelin-web/src/app/notebook/notebook.controller.js +++ b/zeppelin-web/src/app/notebook/notebook.controller.js @@ -41,7 +41,6 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', $scope.isNoteDirty = null; $scope.saveTimer = null; - var angularObjectRegistry = {}; var connectedOnce = false; $scope.$on('setConnectedStatus', function(event, param) { @@ -625,52 +624,4 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', return true; } }; - - $scope.$on('angularObjectUpdate', function(event, data) { - if (data.noteId === $scope.note.id) { - var scope = $rootScope.compiledScope; - var varName = data.angularObject.name; - - if (angular.equals(data.angularObject.object, scope[varName])) { - // return when update has no change - return; - } - - if (!angularObjectRegistry[varName]) { - angularObjectRegistry[varName] = { - interpreterGroupId : data.interpreterGroupId, - }; - } - - angularObjectRegistry[varName].skipEmit = true; - - if (!angularObjectRegistry[varName].clearWatcher) { - angularObjectRegistry[varName].clearWatcher = scope.$watch(varName, function(newValue, oldValue) { - if (angularObjectRegistry[varName].skipEmit) { - angularObjectRegistry[varName].skipEmit = false; - return; - } - websocketMsgSrv.updateAngularObject($routeParams.noteId, varName, newValue, angularObjectRegistry[varName].interpreterGroupId); - }); - } - scope[varName] = data.angularObject.object; - } - }); - - $scope.$on('angularObjectRemove', function(event, data) { - if (!data.noteId || data.noteId === $scope.note.id) { - var scope = $rootScope.compiledScope; - var varName = data.name; - - // clear watcher - if (angularObjectRegistry[varName]) { - angularObjectRegistry[varName].clearWatcher(); - angularObjectRegistry[varName] = undefined; - } - - // remove scope variable - scope[varName] = undefined; - } - }); - }); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index c69f961..0af25fd 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -17,10 +17,12 @@ angular.module('zeppelinWebApp') .controller('ParagraphCtrl', function($scope,$rootScope, $route, $window, $element, $routeParams, $location, $timeout, $compile, websocketMsgSrv) { - + var ANGULAR_FUNCTION_OBJECT_NAME_PREFIX = '_Z_ANGULAR_FUNC_'; $scope.paragraph = null; $scope.originalText = ''; $scope.editor = null; + var paragraphScope = $rootScope.$new(true, $rootScope); + var angularObjectRegistry = {}; var editorModes = { 'ace/mode/scala': /^%spark/, @@ -84,7 +86,7 @@ angular.module('zeppelinWebApp') try { angular.element('#p'+$scope.paragraph.id+'_angular').html($scope.paragraph.result.msg); - $compile(angular.element('#p'+$scope.paragraph.id+'_angular').contents())($rootScope.compiledScope); + $compile(angular.element('#p'+$scope.paragraph.id+'_angular').contents())(paragraphScope); } catch(err) { console.log('ANGULAR rendering error %o', err); } @@ -132,6 +134,85 @@ angular.module('zeppelinWebApp') + $scope.$on('angularObjectUpdate', function(event, data) { + var noteId = $route.current.pathParams.noteId; + if (!data.noteId || (data.noteId === noteId && (!data.paragraphId || data.paragraphId === $scope.paragraph.id))) { + var scope = paragraphScope; + var varName = data.angularObject.name; + + if (angular.equals(data.angularObject.object, scope[varName])) { + // return when update has no change + return; + } + + if (!angularObjectRegistry[varName]) { + angularObjectRegistry[varName] = { + interpreterGroupId : data.interpreterGroupId, + noteId : data.noteId, + paragraphId : data.paragraphId + }; + } else { + angularObjectRegistry[varName].noteId = angularObjectRegistry[varName].noteId || data.noteId; + angularObjectRegistry[varName].paragraphId = angularObjectRegistry[varName].paragraphId || data.paragraphId; + } + + angularObjectRegistry[varName].skipEmit = true; + + if (!angularObjectRegistry[varName].clearWatcher) { + angularObjectRegistry[varName].clearWatcher = scope.$watch(varName, function(newValue, oldValue) { + console.log('angular object (paragraph) updated %o %o', varName, angularObjectRegistry[varName]); + if (angularObjectRegistry[varName].skipEmit) { + angularObjectRegistry[varName].skipEmit = false; + return; + } + websocketMsgSrv.updateAngularObject( + angularObjectRegistry[varName].noteId, + angularObjectRegistry[varName].paragraphId, + varName, + newValue, + angularObjectRegistry[varName].interpreterGroupId); + }); + } + console.log('angular object (paragraph) created %o', varName); + scope[varName] = data.angularObject.object; + + // create proxy for AngularFunction + if (varName.startsWith(ANGULAR_FUNCTION_OBJECT_NAME_PREFIX)) { + var funcName = varName.substring((ANGULAR_FUNCTION_OBJECT_NAME_PREFIX).length); + scope[funcName] = function() { + scope[varName] = arguments; + console.log('angular function (paragraph) invoked %o', arguments); + }; + + console.log('angular function (paragraph) created %o', scope[funcName]); + } + } + }); + + + $scope.$on('angularObjectRemove', function(event, data) { + var noteId = $route.current.pathParams.noteId; + if (!data.noteId || (data.noteId === noteId && (!data.paragraphId || data.paragraphId === $scope.paragraph.id))) { + var scope = paragraphScope; + var varName = data.name; + + // clear watcher + if (angularObjectRegistry[varName]) { + angularObjectRegistry[varName].clearWatcher(); + angularObjectRegistry[varName] = undefined; + } + + // remove scope variable + scope[varName] = undefined; + + // remove proxy for AngularFunction + if (varName.startsWith(ANGULAR_FUNCTION_OBJECT_NAME_PREFIX)) { + var funcName = varName.substring((ANGULAR_FUNCTION_OBJECT_NAME_PREFIX).length); + scope[funcName] = undefined; + } + } + }); + var initializeDefault = function() { var config = $scope.paragraph.config; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index b8f2204..df44010 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -57,11 +57,12 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, websocketEvents.sendNewEvent({ op: 'INSERT_PARAGRAPH', data : {index: newIndex}}); }, - updateAngularObject: function(noteId, name, value, interpreterGroupId) { + updateAngularObject: function(noteId, paragraphId, name, value, interpreterGroupId) { websocketEvents.sendNewEvent({ op: 'ANGULAR_OBJECT_UPDATED', data: { noteId: noteId, + paragraphId: paragraphId, name: name, value: value, interpreterGroupId: interpreterGroupId http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-web/test/spec/controllers/paragraph.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/test/spec/controllers/paragraph.js b/zeppelin-web/test/spec/controllers/paragraph.js index 25716c1..7cdf748 100644 --- a/zeppelin-web/test/spec/controllers/paragraph.js +++ b/zeppelin-web/test/spec/controllers/paragraph.js @@ -13,6 +13,8 @@ describe('Controller: ParagraphCtrl', function() { beforeEach(inject(function($controller, $rootScope) { scope = $rootScope.$new(); + $rootScope.notebookScope = $rootScope.$new(true, $rootScope); + ParagraphCtrl = $controller('ParagraphCtrl', { $scope: scope, websocketMsgSrv: websocketMsgSrvMock, @@ -79,4 +81,4 @@ describe('Controller: ParagraphCtrl', function() { expect(scope.renderAngular).toHaveBeenCalled(); }); -}); \ No newline at end of file +}); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 10f080d..27e2f77 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -218,6 +218,8 @@ public class Note implements Serializable, JobListener { } } } + + removeAllAngularObjectInParagraph(paragraphId); return null; } @@ -400,6 +402,21 @@ public class Note implements Serializable, JobListener { } } + private void removeAllAngularObjectInParagraph(String paragraphId) { + angularObjects = new HashMap<String, List<AngularObject>>(); + + List<InterpreterSetting> settings = replLoader.getInterpreterSettings(); + if (settings == null || settings.size() == 0) { + return; + } + + for (InterpreterSetting setting : settings) { + InterpreterGroup intpGroup = setting.getInterpreterGroup(); + AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); + registry.removeAll(id, paragraphId); + } + } + public void persist() throws IOException { stopDelayedPersistTimer(); snapshotAngularObjectRegistry(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index e58df0d..cae4210 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -284,9 +284,9 @@ public class Notebook { for (InterpreterSetting settings : replFactory.get()) { AngularObjectRegistry registry = settings.getInterpreterGroup().getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { - ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id); + ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null); } else { - registry.removeAll(id); + registry.removeAll(id, null); } } @@ -360,12 +360,13 @@ public class Notebook { if (intpGroup.getId().equals(snapshot.getIntpGroupId())) { AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); String noteId = snapshot.getAngularObject().getNoteId(); + String paragraphId = snapshot.getAngularObject().getParagraphId(); // at this point, remote interpreter process is not created. // so does not make sense add it to the remote. // // therefore instead of addAndNotifyRemoteProcess(), need to use add() // that results add angularObject only in ZeppelinServer side not remoteProcessSide - registry.add(name, snapshot.getAngularObject().get(), noteId); + registry.add(name, snapshot.getAngularObject().get(), noteId, paragraphId); } } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/882cdead/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 82ba137..506b682 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -321,17 +321,17 @@ public class NotebookTest implements JobListenerFactory{ .getAngularObjectRegistry(); // add local scope object - registry.add("o1", "object1", note.id()); + registry.add("o1", "object1", note.id(), null); // add global scope object - registry.add("o2", "object2", null); + registry.add("o2", "object2", null, null); // remove notebook notebook.removeNote(note.id()); // local object should be removed - assertNull(registry.get("o1", note.id())); + assertNull(registry.get("o1", note.id(), null)); // global object sould be remained - assertNotNull(registry.get("o2", null)); + assertNotNull(registry.get("o2", null, null)); } @Test @@ -346,9 +346,9 @@ public class NotebookTest implements JobListenerFactory{ .getAngularObjectRegistry(); // add local scope object - registry.add("o1", "object1", note.id()); + registry.add("o1", "object1", note.id(), null); // add global scope object - registry.add("o2", "object2", null); + registry.add("o2", "object2", null, null); // restart interpreter factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id()); @@ -357,8 +357,8 @@ public class NotebookTest implements JobListenerFactory{ .getAngularObjectRegistry(); // local and global scope object should be removed - assertNull(registry.get("o1", note.id())); - assertNull(registry.get("o2", null)); + assertNull(registry.get("o1", note.id(), null)); + assertNull(registry.get("o2", null, null)); notebook.removeNote(note.id()); }
