Repository: incubator-zeppelin Updated Branches: refs/heads/master e2122bc9c -> 85a2ad376
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java index a03655b..bbc3f06 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java @@ -72,4 +72,34 @@ public class ResourceSet extends LinkedList<Resource> { } return result; } + + public ResourceSet filterByNoteId(String noteId) { + ResourceSet result = new ResourceSet(); + for (Resource r : this) { + if (equals(r.getResourceId().getNoteId(), noteId)) { + result.add(r); + } + } + return result; + } + + public ResourceSet filterByParagraphId(String paragraphId) { + ResourceSet result = new ResourceSet(); + for (Resource r : this) { + if (equals(r.getResourceId().getParagraphId(), paragraphId)) { + result.add(r); + } + } + return result; + } + + private boolean equals(String a, String b) { + if (a == null && b == null) { + return true; + } else if (a != null && b != null) { + return a.equals(b); + } else { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/WellKnownResourceName.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/WellKnownResourceName.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/WellKnownResourceName.java new file mode 100644 index 0000000..2d14fd4 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/WellKnownResourceName.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +/** + * Well known resource names in ResourcePool + */ +public enum WellKnownResourceName { + ParagraphResult("zeppelin.paragraph.result"); // paragraph run result + + String name; + WellKnownResourceName(String name) { + this.name = name; + } + + public String toString() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 224433d..3a70caa 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -78,7 +78,9 @@ service RemoteInterpreterService { // get all resources in the interpreter process list<string> resoucePoolGetAll(); // get value of resource - binary resourceGet(1: string resourceName); + binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName); + // remove resource + bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName); void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string object); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index 1db68ad..3826b90 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; public class MockInterpreterResourcePool extends Interpreter { @@ -61,9 +62,18 @@ public class MockInterpreterResourcePool extends Interpreter { public InterpreterResult interpret(String st, InterpreterContext context) { String[] stmt = st.split(" "); String cmd = stmt[0]; + String noteId = null; + String paragraphId = null; String name = null; if (stmt.length >= 2) { - name = stmt[1]; + String[] npn = stmt[1].split(":"); + if (npn.length == 3) { + noteId = npn[0]; + paragraphId = npn[1]; + name = npn[2]; + } else { + name = stmt[1]; + } } String value = null; if (stmt.length == 3) { @@ -73,11 +83,16 @@ public class MockInterpreterResourcePool extends Interpreter { ResourcePool resourcePool = context.getResourcePool(); Object ret = null; if (cmd.equals("put")) { - resourcePool.put(name, value); + resourcePool.put(noteId, paragraphId, name, value); } else if (cmd.equalsIgnoreCase("get")) { - ret = resourcePool.get(name).get(); + Resource resource = resourcePool.get(noteId, paragraphId, name); + if (resource != null) { + ret = resourcePool.get(noteId, paragraphId, name).get(); + } else { + ret = ""; + } } else if (cmd.equals("remove")) { - ret = resourcePool.remove(name); + ret = resourcePool.remove(noteId, paragraphId, name); } else if (cmd.equals("getAll")) { ret = resourcePool.getAll(); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java index a99fde2..e49b437 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -136,12 +136,13 @@ public class DistributedResourcePoolTest { InterpreterResult ret; intp1.interpret("put key1 value1", context); intp2.interpret("put key2 value2", context); + int numInterpreterResult = 2; ret = intp1.interpret("getAll", context); - assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size()); + assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size()); ret = intp2.interpret("getAll", context); - assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size()); + assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size()); ret = intp1.interpret("get key1", context); assertEquals("value1", gson.fromJson(ret.message(), String.class)); @@ -201,4 +202,44 @@ public class DistributedResourcePoolTest { assertEquals("value1", pool1.getAll().get(0).get()); assertEquals("value2", pool1.getAll().get(1).get()); } + + @Test + public void testResourcePoolUtils() { + Gson gson = new Gson(); + InterpreterResult ret; + + // when create some resources + intp1.interpret("put note1:paragraph1:key1 value1", context); + intp1.interpret("put note1:paragraph2:key1 value2", context); + intp2.interpret("put note2:paragraph1:key1 value1", context); + intp2.interpret("put note2:paragraph2:key2 value2", context); + + int numInterpreterResult = 2; + + // then get all resources. + assertEquals(numInterpreterResult + 4, ResourcePoolUtils.getAllResources().size()); + + // when remove all resources from note1 + ResourcePoolUtils.removeResourcesBelongsToNote("note1"); + + // then resources should be removed. + assertEquals(numInterpreterResult + 2, ResourcePoolUtils.getAllResources().size()); + assertEquals("", gson.fromJson( + intp1.interpret("get note1:paragraph1:key1", context).message(), + String.class)); + assertEquals("", gson.fromJson( + intp1.interpret("get note1:paragraph2:key1", context).message(), + String.class)); + + + // when remove all resources from note2:paragraph1 + ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1"); + + // then 1 + assertEquals(numInterpreterResult + 1, ResourcePoolUtils.getAllResources().size()); + assertEquals("value2", gson.fromJson( + intp1.interpret("get note2:paragraph2:key2", context).message(), + String.class)); + + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 3221054..f0fa385 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -434,6 +434,8 @@ public class InterpreterFactory { angularObjectRegistry = new AngularObjectRegistry( id, angularObjectRegistryListener); + + // TODO(moon) : create distributed resource pool for local interpreters and set } interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index b0470c8..6a09735 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -32,6 +32,7 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.utility.IdHashes; +import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -208,6 +209,8 @@ public class Note implements Serializable, JobListener { * @return a paragraph that was deleted, or <code>null</code> otherwise */ public Paragraph removeParagraph(String paragraphId) { + removeAllAngularObjectInParagraph(paragraphId); + ResourcePoolUtils.removeResourcesBelongsToParagraph(id(), paragraphId); synchronized (paragraphs) { Iterator<Paragraph> i = paragraphs.iterator(); while (i.hasNext()) { @@ -220,7 +223,7 @@ public class Note implements Serializable, JobListener { } } - removeAllAngularObjectInParagraph(paragraphId); + return null; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 8a14b87..4827bff 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -40,6 +40,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; +import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.search.SearchService; import org.quartz.CronScheduleBuilder; @@ -307,6 +308,8 @@ public class Notebook { } } + ResourcePoolUtils.removeResourcesBelongsToNote(id); + try { note.unpersist(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java index 079846c..cf0a613 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java @@ -45,13 +45,20 @@ public class MockInterpreter1 extends Interpreter{ @Override public InterpreterResult interpret(String st, InterpreterContext context) { + InterpreterResult result; if ("getId".equals(st)) { // get unique id of this interpreter instance - return new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); } else { - return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); } + + if (context.getResourcePool() != null) { + context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result); + } + + return result; } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java index dd465a5..bae4b8d 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java @@ -45,7 +45,19 @@ public class MockInterpreter2 extends Interpreter{ @Override public InterpreterResult interpret(String st, InterpreterContext context) { - return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: "+st); + InterpreterResult result; + + if ("getId".equals(st)) { + // get unique id of this interpreter instance + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + } else { + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st); + } + + if (context.getResourcePool() != null) { + context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result); + } + return result; } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 06889d6..3c89c35 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -326,6 +328,33 @@ public class NotebookTest implements JobListenerFactory{ } @Test + public void testResourceRemovealOnParagraphNoteRemove() throws IOException { + Note note = notebook.createNote(); + note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList()); + for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) { + intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId())); + } + Paragraph p1 = note.addParagraph(); + p1.setText("hello"); + Paragraph p2 = note.addParagraph(); + p2.setText("%mock2 world"); + + note.runAll(); + while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield(); + while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield(); + + assertEquals(2, ResourcePoolUtils.getAllResources().size()); + + // remove a paragraph + note.removeParagraph(p1.getId()); + assertEquals(1, ResourcePoolUtils.getAllResources().size()); + + // remove note + notebook.removeNote(note.id()); + assertEquals(0, ResourcePoolUtils.getAllResources().size()); + } + + @Test public void testAngularObjectRemovalOnNotebookRemove() throws InterruptedException, IOException { // create a note and a paragraph
