Repository: incubator-zeppelin Updated Branches: refs/heads/master 738c10e21 -> b88f52e3c
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index af0c2f6..33a3ca6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -45,14 +45,16 @@ public class RemoteScheduler implements Scheduler { boolean terminate = false; private String name; private int maxConcurrency; + private final String noteId; private RemoteInterpreterProcess interpreterProcess; - public RemoteScheduler(String name, ExecutorService executor, + public RemoteScheduler(String name, ExecutorService executor, String noteId, RemoteInterpreterProcess interpreterProcess, SchedulerListener listener, int maxConcurrency) { this.name = name; this.executor = executor; this.listener = listener; + this.noteId = noteId; this.interpreterProcess = interpreterProcess; this.maxConcurrency = maxConcurrency; } @@ -257,7 +259,7 @@ public class RemoteScheduler implements Scheduler { boolean broken = false; try { - String statusStr = client.getStatus(job.getId()); + String statusStr = client.getStatus(noteId, job.getId()); if ("Unknown".equals(statusStr)) { // not found this job in the remote schedulers. // maybe not submitted, maybe already finished http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 8b47145..20b4b8a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -86,6 +86,7 @@ public class SchedulerFactory implements SchedulerListener { public Scheduler createOrGetRemoteScheduler( String name, + String noteId, RemoteInterpreterProcess interpreterProcess, int maxConcurrency) { @@ -94,6 +95,7 @@ public class SchedulerFactory implements SchedulerListener { Scheduler s = new RemoteScheduler( name, executor, + noteId, interpreterProcess, this, maxConcurrency); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index d288324..224433d 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -56,18 +56,18 @@ struct RemoteInterpreterEvent { } service RemoteInterpreterService { - void createInterpreter(1: string intpGroupId, 2: string className, 3: map<string, string> properties); + void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties); - void open(1: string className); - void close(1: string className); - RemoteInterpreterResult interpret(1: string className, 2: string st, 3: RemoteInterpreterContext interpreterContext); - void cancel(1: string className, 2: RemoteInterpreterContext interpreterContext); - i32 getProgress(1: string className, 2: RemoteInterpreterContext interpreterContext); - string getFormType(1: string className); - list<string> completion(1: string className, 2: string buf, 3: i32 cursor); + void open(1: string noteId, 2: string className); + void close(1: string noteId, 2: string className); + RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext); + void cancel(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext); + i32 getProgress(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext); + string getFormType(1: string noteId, 2: string className); + list<string> completion(1: string noteId, 2: string className, 3: string buf, 4: i32 cursor); void shutdown(); - string getStatus(1:string jobId); + string getStatus(1: string noteId, 2:string jobId); RemoteInterpreterEvent getEvent(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index bd8f436..84327dd 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -61,6 +61,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { intp = new RemoteInterpreter( p, + "note", MockInterpreterAngular.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -70,7 +71,8 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { null ); - intpGroup.add(intp); + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intp); intp.setInterpreterGroup(intpGroup); context = new InterpreterContext( http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index c52055c..3fbf5bc 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -44,6 +44,8 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce @Before public void setUp() throws Exception { intpGroup = new InterpreterGroup(); + intpGroup.put("note", new LinkedList<Interpreter>()); + env = new HashMap<String, String>(); env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); } @@ -57,6 +59,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce private RemoteInterpreter createMockInterpreter() { RemoteInterpreter intp = new RemoteInterpreter( new Properties(), + "note", MockInterpreterOutputStream.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -65,7 +68,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce 10 * 1000, this); - intpGroup.add(intp); + intpGroup.get("note").add(intp); intp.setInterpreterGroup(intpGroup); return intp; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 333e4b4..182b7a2 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -63,8 +63,13 @@ public class RemoteInterpreterTest { } private RemoteInterpreter createMockInterpreterA(Properties p) { + return createMockInterpreterA(p, "note"); + } + + private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) { return new RemoteInterpreter( p, + noteId, MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -75,8 +80,13 @@ public class RemoteInterpreterTest { } private RemoteInterpreter createMockInterpreterB(Properties p) { + return createMockInterpreterB(p, "note"); + } + + private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) { return new RemoteInterpreter( p, + noteId, MockInterpreterB.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -89,15 +99,17 @@ public class RemoteInterpreterTest { @Test public void testRemoteInterperterCall() throws TTransportException, IOException { Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); RemoteInterpreter intpA = createMockInterpreterA(p); - intpGroup.add(intpA); + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); RemoteInterpreter intpB = createMockInterpreterB(p); - intpGroup.add(intpB); + intpGroup.get("note").add(intpB); intpB.setInterpreterGroup(intpGroup); @@ -108,10 +120,10 @@ public class RemoteInterpreterTest { assertEquals(0, process.getNumIdleClient()); assertEquals(0, process.referenceCount()); - intpA.open(); + intpA.open(); // initializa all interpreters in the same group assertTrue(process.isRunning()); assertEquals(1, process.getNumIdleClient()); - assertEquals(1, process.referenceCount()); + assertEquals(2, process.referenceCount()); intpA.interpret("1", new InterpreterContext( @@ -144,7 +156,8 @@ public class RemoteInterpreterTest { RemoteInterpreter intpA = createMockInterpreterA(p); - intpGroup.add(intpA); + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); intpA.open(); @@ -167,9 +180,11 @@ public class RemoteInterpreterTest { @Test public void testRemoteSchedulerSharing() throws TTransportException, IOException { Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); RemoteInterpreter intpA = new RemoteInterpreter( p, + "note", MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -178,11 +193,13 @@ public class RemoteInterpreterTest { 10 * 1000, null); - intpGroup.add(intpA); + + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); RemoteInterpreter intpB = new RemoteInterpreter( p, + "note", MockInterpreterB.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -191,7 +208,7 @@ public class RemoteInterpreterTest { 10 * 1000, null); - intpGroup.add(intpB); + intpGroup.get("note").add(intpB); intpB.setInterpreterGroup(intpGroup); intpA.open(); @@ -236,15 +253,16 @@ public class RemoteInterpreterTest { @Test public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException { Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); final RemoteInterpreter intpA = createMockInterpreterA(p); - intpGroup.add(intpA); + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); final RemoteInterpreter intpB = createMockInterpreterB(p); - intpGroup.add(intpB); + intpGroup.get("note").add(intpB); intpB.setInterpreterGroup(intpGroup); intpA.open(); @@ -322,13 +340,11 @@ public class RemoteInterpreterTest { }; intpB.getScheduler().submit(jobB); - // wait until both job finished while (jobA.getStatus() != Status.FINISHED || jobB.getStatus() != Status.FINISHED) { Thread.sleep(100); } - long end = System.currentTimeMillis(); assertTrue(end - start >= 1000); @@ -341,10 +357,11 @@ public class RemoteInterpreterTest { @Test public void testRunOrderPreserved() throws InterruptedException { Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); final RemoteInterpreter intpA = createMockInterpreterA(p); - intpGroup.add(intpA); + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); intpA.open(); @@ -417,10 +434,11 @@ public class RemoteInterpreterTest { public void testRunParallel() throws InterruptedException { Properties p = new Properties(); p.put("parallel", "true"); + intpGroup.put("note", new LinkedList<Interpreter>()); final RemoteInterpreter intpA = createMockInterpreterA(p); - intpGroup.add(intpA); + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); intpA.open(); @@ -507,6 +525,7 @@ public class RemoteInterpreterTest { @Test public void testInterpreterGroupResetAfterProcessFinished() { Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); RemoteInterpreter intpA = createMockInterpreterA(p); @@ -525,10 +544,11 @@ public class RemoteInterpreterTest { @Test public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException { Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); final RemoteInterpreter intpA = createMockInterpreterA(p); - intpGroup.add(intpA); + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); intpA.open(); @@ -577,7 +597,12 @@ public class RemoteInterpreterTest { // restart interpreter RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); intpA.close(); - intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); + + InterpreterGroup newInterpreterGroup = + new InterpreterGroup(intpA.getInterpreterGroup().getId()); + newInterpreterGroup.put("note", new LinkedList<Interpreter>()); + + intpA.setInterpreterGroup(newInterpreterGroup); intpA.open(); RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); @@ -588,15 +613,16 @@ public class RemoteInterpreterTest { @Test public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); RemoteInterpreter intpA = createMockInterpreterA(p); - intpGroup.add(intpA); + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); RemoteInterpreter intpB = createMockInterpreterB(p); - intpGroup.add(intpB); + intpGroup.get("note").add(intpB); intpB.setInterpreterGroup(intpGroup); intpA.open(); @@ -604,4 +630,38 @@ public class RemoteInterpreterTest { assertEquals(intpA.getScheduler(), intpB.getScheduler()); } + + @Test + public void testMultiInterpreterSession() { + Properties p = new Properties(); + intpGroup.put("sessionA", new LinkedList<Interpreter>()); + intpGroup.put("sessionB", new LinkedList<Interpreter>()); + + RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA"); + intpGroup.get("sessionA").add(intpAsessionA); + intpAsessionA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA"); + intpGroup.get("sessionA").add(intpBsessionA); + intpBsessionA.setInterpreterGroup(intpGroup); + + intpAsessionA.open(); + intpBsessionA.open(); + + assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler()); + + RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB"); + intpGroup.get("sessionB").add(intpAsessionB); + intpAsessionB.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB"); + intpGroup.get("sessionB").add(intpBsessionB); + intpBsessionB.setInterpreterGroup(intpGroup); + + intpAsessionB.open(); + intpBsessionB.open(); + + assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler()); + assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler()); + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java index c7097f2..fa6ff7e 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java @@ -91,13 +91,30 @@ public class MockInterpreterB extends Interpreter { public MockInterpreterA getInterpreterA() { InterpreterGroup interpreterGroup = getInterpreterGroup(); - for (Interpreter intp : interpreterGroup) { - if (intp.getClassName().equals(MockInterpreterA.class.getName())) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); + synchronized (interpreterGroup) { + for (List<Interpreter> interpreters : interpreterGroup.values()) { + boolean belongsToSameNoteGroup = false; + MockInterpreterA a = null; + for (Interpreter intp : interpreters) { + if (intp.getClassName().equals(MockInterpreterA.class.getName())) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + a = (MockInterpreterA) p; + } + + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + if (this == p) { + belongsToSameNoteGroup = true; + } + } + if (belongsToSameNoteGroup) { + return a; } - return (MockInterpreterA) p; } } return null; @@ -105,13 +122,10 @@ public class MockInterpreterB extends Interpreter { @Override public Scheduler getScheduler() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - for (Interpreter intp : interpreterGroup) { - if (intp.getClassName().equals(MockInterpreterA.class.getName())) { - return intp.getScheduler(); - } + MockInterpreterA intpA = getInterpreterA(); + if (intpA != null) { + return intpA.getScheduler(); } - return null; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java index 138c1e4..a99fde2 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -58,6 +58,7 @@ public class DistributedResourcePoolTest { intp1 = new RemoteInterpreter( p, + "note", MockInterpreterResourcePool.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -68,11 +69,13 @@ public class DistributedResourcePoolTest { ); intpGroup1 = new InterpreterGroup("intpGroup1"); - intpGroup1.add(intp1); + intpGroup1.put("note", new LinkedList<Interpreter>()); + intpGroup1.get("note").add(intp1); intp1.setInterpreterGroup(intpGroup1); intp2 = new RemoteInterpreter( p, + "note", MockInterpreterResourcePool.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -83,7 +86,8 @@ public class DistributedResourcePoolTest { ); intpGroup2 = new InterpreterGroup("intpGroup2"); - intpGroup2.add(intp2); + intpGroup2.put("note", new LinkedList<Interpreter>()); + intpGroup2.get("note").add(intp2); intp2.setInterpreterGroup(intpGroup2); context = new InterpreterContext( http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index 5acfcc1..9ce7a65 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -31,6 +31,7 @@ import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -68,6 +69,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { final RemoteInterpreter intpA = new RemoteInterpreter( p, + "note", MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -76,12 +78,13 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { 10 * 1000, this); - intpGroup.add(intpA); + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); intpA.open(); - Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", intpA.getInterpreterProcess(), 10); @@ -154,6 +157,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { final RemoteInterpreter intpA = new RemoteInterpreter( p, + "note", MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", @@ -162,12 +166,13 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { 10 * 1000, this); - intpGroup.add(intpA); + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); intpA.setInterpreterGroup(intpGroup); intpA.open(); - Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", intpA.getInterpreterProcess(), 10); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index 50d7d64..5bf42b4 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -94,11 +94,10 @@ public class InterpreterRestApi { NewInterpreterSettingRequest.class); Properties p = new Properties(); p.putAll(request.getProperties()); - // Option is deprecated from API, always use remote = true InterpreterGroup interpreterGroup = interpreterFactory.add(request.getName(), request.getGroup(), request.getDependencies(), - new InterpreterOption(true), + request.getOption(), p); InterpreterSetting setting = interpreterFactory.get(interpreterGroup.getId()); logger.info("new setting created with {}", setting.id()); @@ -126,9 +125,8 @@ public class InterpreterRestApi { try { UpdateInterpreterSettingRequest request = gson.fromJson(message, UpdateInterpreterSettingRequest.class); - // Option is deprecated from API, always use remote = true interpreterFactory.setPropertyAndRestart(settingId, - new InterpreterOption(true), + request.getOption(), request.getProperties(), request.getDependencies()); } catch (InterpreterException e) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index be46f13..2459af8 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -165,7 +165,7 @@ public class NotebookRestApi { setting.id(), setting.getName(), setting.getGroup(), - setting.getInterpreterGroup(), + setting.getInterpreterInfos(), true) ); } @@ -185,7 +185,7 @@ public class NotebookRestApi { setting.id(), setting.getName(), setting.getGroup(), - setting.getInterpreterGroup(), + setting.getInterpreterInfos(), false) ); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java index b74054c..e0ddacb 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java @@ -20,6 +20,7 @@ package org.apache.zeppelin.rest.message; import java.util.List; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterSetting; /** * InterpreterSetting information for binding @@ -29,10 +30,12 @@ public class InterpreterSettingListForNoteBind { String name; String group; private boolean selected; - private List<Interpreter> interpreters; + private List<InterpreterSetting.InterpreterInfo> interpreters; public InterpreterSettingListForNoteBind(String id, String name, - String group, List<Interpreter> interpreters, boolean selected) { + String group, + List<InterpreterSetting.InterpreterInfo> interpreters, + boolean selected) { super(); this.id = id; this.name = name; @@ -65,11 +68,11 @@ public class InterpreterSettingListForNoteBind { this.group = group; } - public List<Interpreter> getInterpreterNames() { + public List<InterpreterSetting.InterpreterInfo> getInterpreterNames() { return interpreters; } - public void setInterpreterNames(List<Interpreter> interpreters) { + public void setInterpreterNames(List<InterpreterSetting.InterpreterInfo> interpreters) { this.interpreters = interpreters; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java index 22eb25f..a559fb5 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.interpreter.InterpreterOption; /** * NewInterpreterSetting rest api request message @@ -29,9 +30,10 @@ import org.apache.zeppelin.dep.Dependency; public class NewInterpreterSettingRequest { String name; String group; - // option was deprecated + Map<String, String> properties; List<Dependency> dependencies; + InterpreterOption option; public NewInterpreterSettingRequest() { @@ -52,4 +54,8 @@ public class NewInterpreterSettingRequest { public List<Dependency> getDependencies() { return dependencies; } + + public InterpreterOption getOption() { + return option; + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java index a3f71ea..c2deeff 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java @@ -21,19 +21,21 @@ import java.util.List; import java.util.Properties; import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.interpreter.InterpreterOption; /** * UpdateInterpreterSetting rest api request message */ public class UpdateInterpreterSettingRequest { - // option was deprecated Properties properties; List<Dependency> dependencies; + InterpreterOption option; public UpdateInterpreterSettingRequest(Properties properties, - List<Dependency> dependencies) { + List<Dependency> dependencies, InterpreterOption option) { this.properties = properties; this.dependencies = dependencies; + this.option = option; } public Properties getProperties() { @@ -43,4 +45,8 @@ public class UpdateInterpreterSettingRequest { public List<Dependency> getDependencies() { return dependencies; } + + public InterpreterOption getOption() { + return option; + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java index 95ffd37..1eec8f3 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java @@ -23,17 +23,14 @@ import org.apache.zeppelin.interpreter.InterpreterOption; /** * Created by eranw on 8/30/15. - * Omit InterpreterOption from serialization */ public class JsonExclusionStrategy implements ExclusionStrategy { public boolean shouldSkipClass(Class<?> arg0) { - //exclude only InterpreterOption - return InterpreterOption.class.equals(arg0); + return false; } public boolean shouldSkipField(FieldAttributes f) { - return false; } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java index 8c640ee..887d42a 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java @@ -23,10 +23,11 @@ import javax.ws.rs.core.NewCookie; import javax.ws.rs.core.Response.ResponseBuilder; import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterSerializer; +import org.apache.zeppelin.interpreter.InterpreterInfoSerializer; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import org.apache.zeppelin.interpreter.InterpreterSetting; /** * Json response builder. @@ -98,8 +99,9 @@ public class JsonResponse<T> { @Override public String toString() { - GsonBuilder gsonBuilder = new GsonBuilder() - .registerTypeAdapter(Interpreter.class, new InterpreterSerializer()); + GsonBuilder gsonBuilder = new GsonBuilder().registerTypeAdapter( + InterpreterSetting.InterpreterInfo.class, + new InterpreterInfoSerializer()); if (pretty) { gsonBuilder.setPrettyPrinting(); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java index 8886add..3862717 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java @@ -90,7 +90,8 @@ public class InterpreterRestApiTest extends AbstractTestRestApi { // Call Create Setting REST API String jsonRequest = "{\"name\":\"md2\",\"group\":\"md\",\"properties\":{\"propname\":\"propvalue\"}," + "\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," + - "\"dependencies\":[]}"; + "\"dependencies\":[]," + + "\"option\": { \"remote\": true, \"perNoteSession\": false }}"; PostMethod post = httpPost("/interpreter/setting/", jsonRequest); LOG.info("testSettingCRUD create response\n" + post.getResponseBodyAsString()); assertThat("test create method:", post, isCreated()); @@ -105,7 +106,8 @@ public class InterpreterRestApiTest extends AbstractTestRestApi { // Call Update Setting REST API jsonRequest = "{\"name\":\"md2\",\"group\":\"md\",\"properties\":{\"propname\":\"Otherpropvalue\"}," + "\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," + - "\"dependencies\":[]}"; + "\"dependencies\":[]," + + "\"option\": { \"remote\": true, \"perNoteSession\": false }}"; PutMethod put = httpPut("/interpreter/setting/" + newSettingId, jsonRequest); LOG.info("testSettingCRUD update response\n" + put.getResponseBodyAsString()); assertThat("test update method:", put, isAllowed()); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index 774f30a..15d8826 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.rest.AbstractTestRestApi; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.socket.Message.OP; @@ -86,14 +87,17 @@ public class NotebookServerTest extends AbstractTestRestApi { InterpreterGroup interpreterGroup = null; List<InterpreterSetting> settings = note1.getNoteReplLoader().getInterpreterSettings(); for (InterpreterSetting setting : settings) { - if (setting.getInterpreterGroup() == null) { - continue; + if (setting.getName().equals("md")) { + interpreterGroup = setting.getInterpreterGroup(); + break; } - - interpreterGroup = setting.getInterpreterGroup(); - break; } + // start interpreter process + Paragraph p1 = note1.addParagraph(); + p1.setText("%md start remote interpreter process"); + note1.run(p1.getId()); + // add angularObject interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html b/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html index 6f46c4e..f7769b4 100644 --- a/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html +++ b/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html @@ -35,6 +35,11 @@ limitations under the License. </select> </div> + <b>Option</b> + <div class="checkbox"> + <label><input type="checkbox" style="top:-5px" ng-model="newInterpreterSetting.option.perNoteSession">Separate Interpreter for each note</input></label> + </div> + <b>Properties</b> <table class="table table-striped properties"> <tr> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-web/src/app/interpreter/interpreter.controller.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/interpreter/interpreter.controller.js b/zeppelin-web/src/app/interpreter/interpreter.controller.js index 1e3800a..ab66033 100644 --- a/zeppelin-web/src/app/interpreter/interpreter.controller.js +++ b/zeppelin-web/src/app/interpreter/interpreter.controller.js @@ -72,7 +72,17 @@ angular.module('zeppelinWebApp').controller('InterpreterCtrl', function($scope, $scope.addNewInterpreterDependency(settingId); } + // add missing field of option + if (!setting.option) { + setting.option = {}; + } + if (setting.option.remote === undefined) { + // remote always true for now + setting.option.remote = true; + } + var request = { + option: angular.copy(setting.option), properties: angular.copy(setting.properties), dependencies: angular.copy(setting.dependencies) }; @@ -214,7 +224,11 @@ angular.module('zeppelinWebApp').controller('InterpreterCtrl', function($scope, name: undefined, group: undefined, properties: {}, - dependencies: [] + dependencies: [], + option: { + remote: true, + perNoteSession: false + } }; emptyNewProperty($scope.newInterpreterSetting); }; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-web/src/app/interpreter/interpreter.html ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/interpreter/interpreter.html b/zeppelin-web/src/app/interpreter/interpreter.html index f1f9a02..c30ad61 100644 --- a/zeppelin-web/src/app/interpreter/interpreter.html +++ b/zeppelin-web/src/app/interpreter/interpreter.html @@ -109,6 +109,19 @@ limitations under the License. </div> </div> <div class="row interpreter"> + <div class="col-md-12"> + <h5>Option</h5> + <div class="checkbox"> + <label> + <input type="checkbox" + style="top:-5px" + ng-disabled="!valueform.$visible" + ng-model="setting.option.perNoteSession"> + Separate Interpreter for each note</input> + </label> + </div> + </div> + <div ng-show="_.isEmpty(setting.properties) && _.isEmpty(setting.dependencies) || valueform.$hidden" class="col-md-12 gray40-message"> <em>Currently there are no properties and dependencies set for this interpreter</em> </div> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 3761709..bf1377c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -51,7 +51,6 @@ import java.util.*; /** * Manage interpreters. - * */ public class InterpreterFactory { Logger logger = LoggerFactory.getLogger(InterpreterFactory.class); @@ -103,7 +102,8 @@ public class InterpreterFactory { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); - builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer()); + builder.registerTypeAdapter( + InterpreterSetting.InterpreterInfo.class, new InterpreterInfoSerializer()); gson = builder.create(); init(); @@ -197,16 +197,14 @@ public class InterpreterFactory { InterpreterSetting setting = interpreterSettings.get(settingId); logger.info("Interpreter setting group {} : id={}, name={}", setting.getGroup(), settingId, setting.getName()); - for (Interpreter interpreter : setting.getInterpreterGroup()) { - logger.info(" className = {}", interpreter.getClassName()); - } } } private void loadFromFile() throws IOException { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); - builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer()); + builder.registerTypeAdapter( + InterpreterSetting.InterpreterInfo.class, new InterpreterInfoSerializer()); Gson gson = builder.create(); File settingFile = new File(conf.getInterpreterSettingPath()); @@ -241,14 +239,12 @@ public class InterpreterFactory { setting.id(), setting.getName(), setting.getGroup(), + setting.getInterpreterInfos(), + setting.getProperties(), setting.getDependencies(), setting.getOption()); - InterpreterGroup interpreterGroup = createInterpreterGroup( - setting.id(), - setting.getGroup(), - setting.getOption(), - setting.getProperties()); + InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), setting.getOption()); intpSetting.setInterpreterGroup(interpreterGroup); interpreterSettings.put(k, intpSetting); @@ -380,9 +376,27 @@ public class InterpreterFactory { throws InterpreterException, IOException, RepositoryException { synchronized (interpreterSettings) { + List<InterpreterSetting.InterpreterInfo> interpreterInfos = + new LinkedList<InterpreterSetting.InterpreterInfo>(); + + for (RegisteredInterpreter registeredInterpreter : + Interpreter.registeredInterpreters.values()) { + if (registeredInterpreter.getGroup().equals(groupName)) { + for (String className : interpreterClassList) { + if (registeredInterpreter.getClassName().equals(className)) { + interpreterInfos.add( + new InterpreterSetting.InterpreterInfo( + className, registeredInterpreter.getName())); + } + } + } + } + InterpreterSetting intpSetting = new InterpreterSetting( name, groupName, + interpreterInfos, + properties, dependencies, option); @@ -390,8 +404,7 @@ public class InterpreterFactory { loadInterpreterDependencies(intpSetting); } - InterpreterGroup interpreterGroup = createInterpreterGroup( - intpSetting.id(), groupName, option, properties); + InterpreterGroup interpreterGroup = createInterpreterGroup(intpSetting.id(), option); intpSetting.setInterpreterGroup(interpreterGroup); @@ -401,18 +414,12 @@ public class InterpreterFactory { } } - private InterpreterGroup createInterpreterGroup(String id, - String groupName, - InterpreterOption option, - Properties properties) + private InterpreterGroup createInterpreterGroup(String id, InterpreterOption option) throws InterpreterException, NullArgumentException { //When called from REST API without option we receive NPE if (option == null) throw new NullArgumentException("option"); - //When called from REST API without option we receive NPE - if (properties == null) - throw new NullArgumentException("properties"); AngularObjectRegistry angularObjectRegistry; @@ -430,19 +437,68 @@ public class InterpreterFactory { } interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); + return interpreterGroup; + } + + public void removeInterpretersForNote(InterpreterSetting interpreterSetting, + String noteId) { + if (!interpreterSetting.getOption().isPerNoteSession()) { + return; + } + InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(); + interpreterGroup.close(noteId); + interpreterGroup.destroy(noteId); + synchronized (interpreterGroup) { + interpreterGroup.remove(noteId); + interpreterGroup.notifyAll(); // notify createInterpreterForNote() + } + logger.info("Interpreter instance {} for note {} is removed", + interpreterSetting.getName(), + noteId); + } + + public void createInterpretersForNote( + InterpreterSetting interpreterSetting, + String noteId) { + InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(); + String groupName = interpreterSetting.getGroup(); + InterpreterOption option = interpreterSetting.getOption(); + Properties properties = interpreterSetting.getProperties(); + + // if interpreters are already there, wait until they're being removed + synchronized (interpreterGroup) { + long interpreterRemovalWaitStart = System.nanoTime(); + // interpreter process supposed to be terminated by RemoteInterpreterProcess.dereference() + // in ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT msec. However, if termination of the process and + // removal from interpreter group take too long, throw an error. + long minTimeout = 10 * 1000 * 1000000; // 10 sec + long interpreterRemovalWaitTimeout = + Math.max(minTimeout, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 2); + while (interpreterGroup.containsKey(noteId)) { + if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) { + throw new InterpreterException("Can not create interpreter"); + } + try { + interpreterGroup.wait(1000); + } catch (InterruptedException e) { + logger.debug(e.getMessage(), e); + } + } + } + logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId); for (String className : interpreterClassList) { Set<String> keys = Interpreter.registeredInterpreters.keySet(); for (String intName : keys) { - RegisteredInterpreter info = Interpreter.registeredInterpreters - .get(intName); + RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName); if (info.getClassName().equals(className) && info.getGroup().equals(groupName)) { Interpreter intp; if (option.isRemote()) { intp = createRemoteRepl(info.getPath(), + noteId, info.getClassName(), properties, interpreterGroup.id); @@ -451,15 +507,25 @@ public class InterpreterFactory { info.getClassName(), properties); } - interpreterGroup.add(intp); + + synchronized (interpreterGroup) { + List<Interpreter> interpreters = interpreterGroup.get(noteId); + if (interpreters == null) { + interpreters = new LinkedList<Interpreter>(); + interpreterGroup.put(noteId, interpreters); + } + interpreters.add(intp); + } + logger.info("Interpreter " + intp.getClassName() + " " + intp.hashCode() + " created"); intp.setInterpreterGroup(interpreterGroup); break; } } } - return interpreterGroup; } + + public void remove(String id) throws IOException { synchronized (interpreterSettings) { if (interpreterSettings.containsKey(id)) { @@ -486,7 +552,7 @@ public class InterpreterFactory { } /** - * Get loaded interpreters + * Get interpreter settings * @return */ public List<InterpreterSetting> get() { @@ -508,8 +574,8 @@ public class InterpreterFactory { continue; } } + for (InterpreterSetting.InterpreterInfo intp : setting.getInterpreterInfos()) { - for (Interpreter intp : setting.getInterpreterGroup()) { if (className.equals(intp.getClassName())) { boolean alreadyAdded = false; for (InterpreterSetting st : orderedSettings) { @@ -536,15 +602,33 @@ public class InterpreterFactory { public void putNoteInterpreterSettingBinding(String noteId, List<String> settingList) throws IOException { + List<String> unBindedSettings = new LinkedList<String>(); + synchronized (interpreterSettings) { + List<String> oldSettings = interpreterBindings.get(noteId); + if (oldSettings != null) { + for (String oldSettingId : oldSettings) { + if (!settingList.contains(oldSettingId)) { + unBindedSettings.add(oldSettingId); + } + } + } interpreterBindings.put(noteId, settingList); saveToFile(); + + for (String settingId : unBindedSettings) { + InterpreterSetting setting = get(settingId); + removeInterpretersForNote(setting, noteId); + } } } public void removeNoteInterpreterSettingBinding(String noteId) { synchronized (interpreterSettings) { - interpreterBindings.remove(noteId); + List<String> settingIds = interpreterBindings.remove(noteId); + for (String settingId : settingIds) { + this.removeInterpretersForNote(get(settingId), noteId); + } } } @@ -582,9 +666,7 @@ public class InterpreterFactory { intpsetting.setOption(option); intpsetting.setDependencies(dependencies); - InterpreterGroup interpreterGroup = createInterpreterGroup( - intpsetting.id(), - intpsetting.getGroup(), option, properties); + InterpreterGroup interpreterGroup = createInterpreterGroup(intpsetting.id(), option); intpsetting.setInterpreterGroup(interpreterGroup); loadInterpreterDependencies(intpsetting); @@ -608,7 +690,7 @@ public class InterpreterFactory { InterpreterGroup interpreterGroup = createInterpreterGroup( intpsetting.id(), - intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties()); + intpsetting.getOption()); intpsetting.setInterpreterGroup(interpreterGroup); } else { throw new InterpreterException("Interpreter setting id " + id @@ -619,16 +701,18 @@ public class InterpreterFactory { private void stopJobAllInterpreter(InterpreterSetting intpsetting) { if (intpsetting != null) { - for (Interpreter intp : intpsetting.getInterpreterGroup()) { - for (Job job : intp.getScheduler().getJobsRunning()) { - job.abort(); - job.setStatus(Status.ABORT); - logger.info("Job " + job.getJobName() + " aborted "); - } - for (Job job : intp.getScheduler().getJobsWaiting()) { - job.abort(); - job.setStatus(Status.ABORT); - logger.info("Job " + job.getJobName() + " aborted "); + for (List<Interpreter> interpreters : intpsetting.getInterpreterGroup().values()) { + for (Interpreter intp : interpreters) { + for (Job job : intp.getScheduler().getJobsRunning()) { + job.abort(); + job.setStatus(Status.ABORT); + logger.info("Job " + job.getJobName() + " aborted "); + } + for (Job job : intp.getScheduler().getJobsWaiting()) { + job.abort(); + job.setStatus(Status.ABORT); + logger.info("Job " + job.getJobName() + " aborted "); + } } } } @@ -720,13 +804,13 @@ public class InterpreterFactory { } - private Interpreter createRemoteRepl(String interpreterPath, String className, + private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className, Properties property, String interpreterId) { int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId; int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter( - property, className, conf.getInterpreterRemoteRunnerPath(), + property, noteId, className, conf.getInterpreterRemoteRunnerPath(), interpreterPath, localRepoPath, connectTimeout, maxPoolSize, remoteInterpreterProcessListener)); return intp; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSerializer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSerializer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSerializer.java new file mode 100644 index 0000000..5854983 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSerializer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.lang.reflect.Type; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + + +/** + * InterpreterInfo class serializer for gson + * + */ +public class InterpreterInfoSerializer + implements JsonSerializer<InterpreterSetting.InterpreterInfo>, + JsonDeserializer<InterpreterSetting.InterpreterInfo> { + + @Override + public JsonElement serialize(InterpreterSetting.InterpreterInfo interpreterInfo, Type type, + JsonSerializationContext context) { + JsonObject json = new JsonObject(); + json.addProperty("class", interpreterInfo.getClassName()); + json.addProperty("name", interpreterInfo.getName()); + return json; + } + + @Override + public InterpreterSetting.InterpreterInfo deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + JsonObject jsonObject = json.getAsJsonObject(); + String className = jsonObject.get("class").getAsString(); + String name = jsonObject.get("name").getAsString(); + + return new InterpreterSetting.InterpreterInfo(className, name); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java index e2adecd..29a4748 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java @@ -22,6 +22,7 @@ package org.apache.zeppelin.interpreter; */ public class InterpreterOption { boolean remote; + boolean perNoteSession; public InterpreterOption() { remote = false; @@ -38,4 +39,12 @@ public class InterpreterOption { public void setRemote(boolean remote) { this.remote = remote; } + + public boolean isPerNoteSession() { + return perNoteSession; + } + + public void setPerNoteSession(boolean perNoteSession) { + this.perNoteSession = perNoteSession; + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java deleted file mode 100644 index a2deb7e..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter; - -import java.lang.reflect.Type; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; - - -/** - * Interpreter class serializer for gson - * - */ -public class InterpreterSerializer implements JsonSerializer<Interpreter>, - JsonDeserializer<Interpreter> { - - @Override - public JsonElement serialize(Interpreter interpreter, Type type, - JsonSerializationContext context) { - JsonObject json = new JsonObject(); - json.addProperty("class", interpreter.getClassName()); - json.addProperty( - "name", - Interpreter.findRegisteredInterpreterByClassName( - interpreter.getClassName()).getName()); - return json; - } - - @Override - public Interpreter deserialize(JsonElement json, Type typeOfT, - JsonDeserializationContext context) throws JsonParseException { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index e8080a2..5c93e3a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -34,27 +34,59 @@ public class InterpreterSetting { private String group; private String description; private Properties properties; - private InterpreterGroup interpreterGroup; + + // use 'interpreterGroup' as a field name to keep backward compatibility of + // conf/interpreter.json file format + private List<InterpreterInfo> interpreterGroup; + private transient InterpreterGroup interpreterGroupRef; private List<Dependency> dependencies; private InterpreterOption option; public InterpreterSetting(String id, String name, String group, + List<InterpreterInfo> interpreterInfos, + Properties properties, List<Dependency> dependencies, InterpreterOption option) { this.id = id; this.name = name; this.group = group; + this.interpreterGroup = interpreterInfos; + this.properties = properties; this.dependencies = dependencies; this.option = option; } public InterpreterSetting(String name, String group, + List<InterpreterInfo> interpreterInfos, + Properties properties, List<Dependency> dependencies, InterpreterOption option) { - this(generateId(), name, group, dependencies, option); + this(generateId(), name, group, interpreterInfos, properties, dependencies, option); + } + + /** + * Information of interpreters in this interpreter setting. + * this will be serialized for conf/interpreter.json and REST api response. + */ + public static class InterpreterInfo { + private final String name; + private final String className; + + public InterpreterInfo(String className, String name) { + this.className = className; + this.name = name; + } + + public String getName() { + return name; + } + + public String getClassName() { + return className; + } } public String id() { @@ -86,12 +118,11 @@ public class InterpreterSetting { } public InterpreterGroup getInterpreterGroup() { - return interpreterGroup; + return interpreterGroupRef; } public void setInterpreterGroup(InterpreterGroup interpreterGroup) { - this.interpreterGroup = interpreterGroup; - this.properties = interpreterGroup.getProperty(); + this.interpreterGroupRef = interpreterGroup; } public Properties getProperties() { @@ -120,4 +151,8 @@ public class InterpreterSetting { public void setOption(InterpreterOption option) { this.option = option; } + + public List<InterpreterInfo> getInterpreterInfos() { + return interpreterGroup; + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java index 509a064..4e7626b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java @@ -29,10 +29,11 @@ import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; /** - * Repl loader per note. + * Interpreter loader per note. */ public class NoteInterpreterLoader { private transient InterpreterFactory factory; + private static String SHARED_SESSION = "shared_session"; String noteId; public NoteInterpreterLoader(InterpreterFactory factory) { @@ -73,6 +74,37 @@ public class NoteInterpreterLoader { return settings; } + private String getInterpreterGroupKey(InterpreterSetting setting) { + if (!setting.getOption().isPerNoteSession()) { + return SHARED_SESSION; + } else { + return noteId; + } + } + + private List<Interpreter> createOrGetInterpreterList(InterpreterSetting setting) { + InterpreterGroup interpreterGroup = setting.getInterpreterGroup(); + synchronized (interpreterGroup) { + String key = getInterpreterGroupKey(setting); + if (!interpreterGroup.containsKey(key)) { + factory.createInterpretersForNote(setting, key); + } + return interpreterGroup.get(getInterpreterGroupKey(setting)); + } + } + + public void close() { + // close interpreters in this note session + List<InterpreterSetting> settings = this.getInterpreterSettings(); + if (settings == null || settings.size() == 0) { + return; + } + + for (InterpreterSetting setting : settings) { + factory.removeInterpretersForNote(setting, noteId); + } + } + public Interpreter get(String replName) { List<InterpreterSetting> settings = getInterpreterSettings(); @@ -81,7 +113,9 @@ public class NoteInterpreterLoader { } if (replName == null || replName.trim().length() == 0) { - return settings.get(0).getInterpreterGroup().getFirst(); + // get default settings (first available) + InterpreterSetting defaultSettings = settings.get(0); + return createOrGetInterpreterList(defaultSettings).get(0); } if (Interpreter.registeredInterpreters == null) { @@ -104,43 +138,47 @@ public class NoteInterpreterLoader { String interpreterClassName = registeredInterpreter.getClassName(); for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(); - for (Interpreter interpreter : intpGroup) { - if (interpreterClassName.equals(interpreter.getClassName())) { - return interpreter; + if (registeredInterpreter.getGroup().equals(setting.getGroup())) { + List<Interpreter> intpGroup = createOrGetInterpreterList(setting); + for (Interpreter interpreter : intpGroup) { + if (interpreterClassName.equals(interpreter.getClassName())) { + return interpreter; + } } } } + throw new InterpreterException(replName + " interpreter not found"); } else { // first assume replName is 'name' of interpreter. ('groupName' is ommitted) // search 'name' from first (default) interpreter group - InterpreterGroup intpGroup = settings.get(0).getInterpreterGroup(); - for (Interpreter interpreter : intpGroup) { - RegisteredInterpreter intp = Interpreter - .findRegisteredInterpreterByClassName(interpreter.getClassName()); - if (intp == null) { - continue; - } + InterpreterSetting defaultSetting = settings.get(0); + Interpreter.RegisteredInterpreter registeredInterpreter = + Interpreter.registeredInterpreters.get(defaultSetting.getGroup() + "." + replName); + if (registeredInterpreter != null) { + List<Interpreter> interpreters = createOrGetInterpreterList(defaultSetting); + for (Interpreter interpreter : interpreters) { + + RegisteredInterpreter intp = + Interpreter.findRegisteredInterpreterByClassName(interpreter.getClassName()); + if (intp == null) { + continue; + } - if (intp.getName().equals(replName)) { - return interpreter; + if (intp.getName().equals(replName)) { + return interpreter; + } } - } + throw new InterpreterException( + defaultSetting.getGroup() + "." + replName + " interpreter not found"); + } // next, assume replName is 'group' of interpreter ('name' is ommitted) // search interpreter group and return first interpreter. for (InterpreterSetting setting : settings) { - intpGroup = setting.getInterpreterGroup(); - Interpreter interpreter = intpGroup.get(0); - RegisteredInterpreter intp = Interpreter - .findRegisteredInterpreterByClassName(interpreter.getClassName()); - if (intp == null) { - continue; - } - - if (intp.getGroup().equals(replName)) { - return interpreter; + if (setting.getGroup().equals(replName)) { + List<Interpreter> interpreters = createOrGetInterpreterList(setting); + return interpreters.get(0); } } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 14d2fd8..79482b8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -244,7 +244,8 @@ public class Notebook { Note note = getNote(id); if (note != null) { note.getNoteReplLoader().setInterpreters(interpreterSettingIds); - replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds); + // comment out while note.getNoteReplLoader().setInterpreters(...) do the same + // replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds); } } @@ -278,6 +279,7 @@ public class Notebook { synchronized (notes) { note = notes.remove(id); } + replFactory.removeNoteInterpreterSettingBinding(id); notebookIndex.deleteIndexDocs(note); // remove from all interpreter instance's angular object registry http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index ce7528f..75d105f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.JobListener; +import org.apache.zeppelin.scheduler.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -261,7 +262,17 @@ public class Paragraph extends Job implements Serializable, Cloneable { @Override protected boolean jobAbort() { Interpreter repl = getRepl(getRequiredReplName()); - Job job = repl.getScheduler().removeFromWaitingQueue(getId()); + if (repl == null) { + // when interpreters are already destroyed + return true; + } + + Scheduler scheduler = repl.getScheduler(); + if (scheduler == null) { + return true; + } + + Job job = scheduler.removeFromWaitingQueue(getId()); if (job != null) { job.setStatus(Status.ABORT); } else { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 7020a47..d215e7c 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -22,8 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.util.LinkedList; import java.util.List; import java.util.Properties; @@ -87,9 +86,12 @@ public class InterpreterFactoryTest { @Test public void testBasic() { List<String> all = factory.getDefaultInterpreterSettingList(); + InterpreterSetting setting = factory.get(all.get(0)); + InterpreterGroup interpreterGroup = setting.getInterpreterGroup(); + factory.createInterpretersForNote(setting, "session"); // get interpreter - Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst(); + Interpreter repl1 = interpreterGroup.get("session").get(0); assertFalse(((LazyOpenInterpreter) repl1).isOpen()); repl1.interpret("repl1", context); assertTrue(((LazyOpenInterpreter) repl1).isOpen()); @@ -99,23 +101,14 @@ public class InterpreterFactoryTest { // restart interpreter factory.restart(all.get(0)); - repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst(); - assertFalse(((LazyOpenInterpreter) repl1).isOpen()); + assertNull(setting.getInterpreterGroup().get("session")); } @Test public void testFactoryDefaultList() throws IOException, RepositoryException { - // get default list from default setting + // get default settings List<String> all = factory.getDefaultInterpreterSettingList(); assertEquals(2, all.size()); - assertEquals(factory.get(all.get(0)).getInterpreterGroup().getFirst().getClassName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); - - // add setting - factory.add("a mock", "mock2", new LinkedList<Dependency>(), new InterpreterOption(false), new Properties()); - all = factory.getDefaultInterpreterSettingList(); - assertEquals(2, all.size()); - assertEquals("mock1", factory.get(all.get(0)).getName()); - assertEquals("a mock", factory.get(all.get(1)).getName()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java index 09259b1..079846c 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java @@ -45,7 +45,13 @@ public class MockInterpreter1 extends Interpreter{ @Override public InterpreterResult interpret(String st, InterpreterContext context) { - return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st); + + if ("getId".equals(st)) { + // get unique id of this interpreter instance + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + } else { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java index 04a6ceb..aa98afd 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java @@ -16,8 +16,6 @@ */ package org.apache.zeppelin.notebook; -import static org.junit.Assert.assertEquals; - import java.io.File; import java.io.IOException; import java.util.Collections; @@ -36,6 +34,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + public class NoteInterpreterLoaderTest { private File tmpDir; @@ -92,6 +92,43 @@ public class NoteInterpreterLoaderTest { assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("group1.mock1").getClassName()); assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", loader.get("group1.mock11").getClassName()); assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", loader.get("group2.mock2").getClassName()); + + loader.close(); + } + + @Test + public void testNoteSession() throws IOException { + NoteInterpreterLoader loaderA = new NoteInterpreterLoader(factory); + loaderA.setNoteId("noteA"); + loaderA.setInterpreters(factory.getDefaultInterpreterSettingList()); + loaderA.getInterpreterSettings().get(0).getOption().setPerNoteSession(true); + + NoteInterpreterLoader loaderB = new NoteInterpreterLoader(factory); + loaderB.setNoteId("noteB"); + loaderB.setInterpreters(factory.getDefaultInterpreterSettingList()); + loaderB.getInterpreterSettings().get(0).getOption().setPerNoteSession(true); + + // interpreters are not created before accessing it + assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA")); + assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB")); + + // per note session interpreter instance in the same interpreter process + assertTrue( + loaderA.get(null).getInterpreterGroup().getRemoteInterpreterProcess() == + loaderB.get(null).getInterpreterGroup().getRemoteInterpreterProcess()); + + // interpreters are created after accessing it + assertNotNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA")); + assertNotNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB")); + + // when + loaderA.close(); + loaderB.close(); + + // interpreters are destroyed after close + assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA")); + assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB")); + } private void delete(File file){ http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 0b5af2c..db7a466 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -26,23 +26,14 @@ import static org.mockito.Mockito.mock; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.HashSet; +import java.util.*; import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.interpreter.InterpreterFactory; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.InterpreterOutput; -import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; import org.apache.zeppelin.notebook.repo.NotebookRepo; @@ -489,6 +480,109 @@ public class NotebookTest implements JobListenerFactory{ assertTrue(isAborted); } + @Test + public void testPerSessionInterpreterCloseOnNoteRemoval() throws IOException { + // create a notes + Note note1 = notebook.createNote(); + Paragraph p1 = note1.addParagraph(); + p1.setText("getId"); + + // restart interpreter with per note session enabled + for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) { + setting.getOption().setPerNoteSession(true); + notebook.getInterpreterFactory().restart(setting.id()); + } + + note1.run(p1.getId()); + while (p1.getStatus() != Status.FINISHED) Thread.yield(); + InterpreterResult result = p1.getResult(); + + // remove note and recreate + notebook.removeNote(note1.getId()); + note1 = notebook.createNote(); + p1 = note1.addParagraph(); + p1.setText("getId"); + + note1.run(p1.getId()); + while (p1.getStatus() != Status.FINISHED) Thread.yield(); + assertNotEquals(p1.getResult().message(), result.message()); + + notebook.removeNote(note1.getId()); + } + + @Test + public void testPerSessionInterpreter() throws IOException { + // create two notes + Note note1 = notebook.createNote(); + Paragraph p1 = note1.addParagraph(); + + Note note2 = notebook.createNote(); + Paragraph p2 = note2.addParagraph(); + + p1.setText("getId"); + p2.setText("getId"); + + // run per note session disabled + note1.run(p1.getId()); + note2.run(p2.getId()); + + while (p1.getStatus() != Status.FINISHED) Thread.yield(); + while (p2.getStatus() != Status.FINISHED) Thread.yield(); + + assertEquals(p1.getResult().message(), p2.getResult().message()); + + + // restart interpreter with per note session enabled + for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) { + setting.getOption().setPerNoteSession(true); + notebook.getInterpreterFactory().restart(setting.id()); + } + + // run per note session enabled + note1.run(p1.getId()); + note2.run(p2.getId()); + + while (p1.getStatus() != Status.FINISHED) Thread.yield(); + while (p2.getStatus() != Status.FINISHED) Thread.yield(); + + assertNotEquals(p1.getResult().message(), p2.getResult().message()); + + notebook.removeNote(note1.getId()); + notebook.removeNote(note2.getId()); + } + + @Test + public void testPerSessionInterpreterCloseOnUnbindInterpreterSetting() throws IOException { + // create a notes + Note note1 = notebook.createNote(); + Paragraph p1 = note1.addParagraph(); + p1.setText("getId"); + + // restart interpreter with per note session enabled + for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) { + setting.getOption().setPerNoteSession(true); + notebook.getInterpreterFactory().restart(setting.id()); + } + + note1.run(p1.getId()); + while (p1.getStatus() != Status.FINISHED) Thread.yield(); + InterpreterResult result = p1.getResult(); + + + // unbind, and rebind setting. that result interpreter instance close + List<String> bindedSettings = notebook.getBindedInterpreterSettingsIds(note1.getId()); + notebook.bindInterpretersToNote(note1.getId(), new LinkedList<String>()); + notebook.bindInterpretersToNote(note1.getId(), bindedSettings); + + note1.run(p1.getId()); + while (p1.getStatus() != Status.FINISHED) Thread.yield(); + + assertNotEquals(result.message(), p1.getResult().message()); + + notebook.removeNote(note1.getId()); + } + + private void delete(File file){ if(file.isFile()) file.delete(); else if(file.isDirectory()){
