http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java new file mode 100644 index 0000000..95235e5 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -0,0 +1,975 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.Job.Status; +import org.apache.zeppelin.scheduler.Scheduler; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +public class RemoteInterpreterTest { + + + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + private InterpreterGroup intpGroup; + private HashMap<String, String> env; + + @Before + public void setUp() throws Exception { + intpGroup = new InterpreterGroup(); + env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + } + + @After + public void tearDown() throws Exception { + intpGroup.close(); + } + + private RemoteInterpreter createMockInterpreterA(Properties p) { + return createMockInterpreterA(p, "note"); + } + + private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) { + return new RemoteInterpreter( + p, + noteId, + MockInterpreterA.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + } + + private RemoteInterpreter createMockInterpreterB(Properties p) { + return createMockInterpreterB(p, "note"); + } + + private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) { + return new RemoteInterpreter( + p, + noteId, + MockInterpreterB.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + } + + @Test + public void testRemoteInterperterCall() throws TTransportException, IOException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpB = createMockInterpreterB(p); + + intpGroup.get("note").add(intpB); + intpB.setInterpreterGroup(intpGroup); + + + RemoteInterpreterProcess process = intpA.getInterpreterProcess(); + process.equals(intpB.getInterpreterProcess()); + + assertFalse(process.isRunning()); + assertEquals(0, process.getNumIdleClient()); + assertEquals(0, process.referenceCount()); + + intpA.open(); // initializa all interpreters in the same group + assertTrue(process.isRunning()); + assertEquals(1, process.getNumIdleClient()); + assertEquals(1, process.referenceCount()); + + intpA.interpret("1", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + intpB.open(); + assertEquals(1, process.referenceCount()); + + intpA.close(); + assertEquals(0, process.referenceCount()); + intpB.close(); + assertEquals(0, process.referenceCount()); + + assertFalse(process.isRunning()); + + } + + @Test + public void testExecuteIncorrectPrecode() throws TTransportException, IOException { + Properties p = new Properties(); + p.put("zeppelin.MockInterpreterA.precode", "fail test"); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreterProcess process = intpA.getInterpreterProcess(); + + intpA.open(); + + InterpreterResult result = intpA.interpret("1", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + + + intpA.close(); + assertEquals(Code.ERROR, result.code()); + } + + @Test + public void testExecuteCorrectPrecode() throws TTransportException, IOException { + Properties p = new Properties(); + p.put("zeppelin.MockInterpreterA.precode", "2"); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreterProcess process = intpA.getInterpreterProcess(); + + intpA.open(); + + InterpreterResult result = intpA.interpret("1", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + + + intpA.close(); + assertEquals(Code.SUCCESS, result.code()); + assertEquals("1", result.message().get(0).getData()); + } + + @Test + public void testRemoteInterperterErrorStatus() throws TTransportException, IOException { + Properties p = new Properties(); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + InterpreterResult ret = intpA.interpret("non numeric value", + new InterpreterContext( + "noteId", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + assertEquals(Code.ERROR, ret.code()); + } + + @Test + public void testRemoteSchedulerSharing() throws TTransportException, IOException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = new RemoteInterpreter( + p, + "note", + MockInterpreterA.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpB = new RemoteInterpreter( + p, + "note", + MockInterpreterB.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + + intpGroup.get("note").add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + + long start = System.currentTimeMillis(); + InterpreterResult ret = intpA.interpret("500", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + assertEquals("500", ret.message().get(0).getData()); + + ret = intpB.interpret("500", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + assertEquals("1000", ret.message().get(0).getData()); + long end = System.currentTimeMillis(); + assertTrue(end - start >= 1000); + + + intpA.close(); + intpB.close(); + } + + @Test + public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + final RemoteInterpreter intpB = createMockInterpreterB(p); + + intpGroup.get("note").add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + + long start = System.currentTimeMillis(); + Job jobA = new Job("jobA", null) { + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpA.interpret("500", + new InterpreterContext( + "note", + "jobA", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpA.getScheduler().submit(jobA); + + Job jobB = new Job("jobB", null) { + + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpB.interpret("500", + new InterpreterContext( + "note", + "jobB", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpB.getScheduler().submit(jobB); + // wait until both job finished + while (jobA.getStatus() != Status.FINISHED || + jobB.getStatus() != Status.FINISHED) { + Thread.sleep(100); + } + long end = System.currentTimeMillis(); + assertTrue(end - start >= 1000); + + assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message().get(0).getData()); + + intpA.close(); + intpB.close(); + } + + @Test + public void testRunOrderPreserved() throws InterruptedException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + int concurrency = 3; + final List<InterpreterResultMessage> results = new LinkedList<>(); + + Scheduler scheduler = intpA.getScheduler(); + for (int i = 0; i < concurrency; i++) { + final String jobId = Integer.toString(i); + scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext( + "note", + jobId, + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + synchronized (results) { + results.addAll(ret.message()); + results.notify(); + } + return null; + } + + @Override + protected boolean jobAbort() { + return false; + } + + }); + } + + // wait for job finished + synchronized (results) { + while (results.size() != concurrency) { + results.wait(300); + } + } + + int i = 0; + for (InterpreterResultMessage result : results) { + assertEquals(Integer.toString(i++), result.getData()); + } + assertEquals(concurrency, i); + + intpA.close(); + } + + + @Test + public void testRunParallel() throws InterruptedException { + Properties p = new Properties(); + p.put("parallel", "true"); + intpGroup.put("note", new LinkedList<Interpreter>()); + + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + int concurrency = 4; + final int timeToSleep = 1000; + final List<InterpreterResultMessage> results = new LinkedList<>(); + long start = System.currentTimeMillis(); + + Scheduler scheduler = intpA.getScheduler(); + for (int i = 0; i < concurrency; i++) { + final String jobId = Integer.toString(i); + scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) { + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + String stmt = Integer.toString(timeToSleep); + InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext( + "note", + jobId, + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + synchronized (results) { + results.addAll(ret.message()); + results.notify(); + } + return stmt; + } + + @Override + protected boolean jobAbort() { + return false; + } + + }); + } + + // wait for job finished + synchronized (results) { + while (results.size() != concurrency) { + results.wait(300); + } + } + + long end = System.currentTimeMillis(); + + assertTrue(end - start < timeToSleep * concurrency); + + intpA.close(); + } + + @Test + public void testInterpreterGroupResetBeforeProcessStarts() { + Properties p = new Properties(); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpA.setInterpreterGroup(intpGroup); + RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + + intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); + RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); + + assertNotSame(processA.hashCode(), processB.hashCode()); + } + + @Test + public void testInterpreterGroupResetAfterProcessFinished() { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpA.setInterpreterGroup(intpGroup); + RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + intpA.open(); + + processA.dereference(); // intpA.close(); + + intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); + RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); + + assertNotSame(processA.hashCode(), processB.hashCode()); + } + + @Test + public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + Job jobA = new Job("jobA", null) { + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpA.interpret("2000", + new InterpreterContext( + "note", + "jobA", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpA.getScheduler().submit(jobA); + + // wait for job started + while (intpA.getScheduler().getJobsRunning().size() == 0) { + Thread.sleep(100); + } + + // restart interpreter + RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + intpA.close(); + + InterpreterGroup newInterpreterGroup = + new InterpreterGroup(intpA.getInterpreterGroup().getId()); + newInterpreterGroup.put("note", new LinkedList<Interpreter>()); + + intpA.setInterpreterGroup(newInterpreterGroup); + intpA.open(); + RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); + + assertNotSame(processA.hashCode(), processB.hashCode()); + + } + + @Test + public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpB = createMockInterpreterB(p); + + intpGroup.get("note").add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + + assertEquals(intpA.getScheduler(), intpB.getScheduler()); + } + + @Test + public void testMultiInterpreterSession() { + Properties p = new Properties(); + intpGroup.put("sessionA", new LinkedList<Interpreter>()); + intpGroup.put("sessionB", new LinkedList<Interpreter>()); + + RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA"); + intpGroup.get("sessionA").add(intpAsessionA); + intpAsessionA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA"); + intpGroup.get("sessionA").add(intpBsessionA); + intpBsessionA.setInterpreterGroup(intpGroup); + + intpAsessionA.open(); + intpBsessionA.open(); + + assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler()); + + RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB"); + intpGroup.get("sessionB").add(intpAsessionB); + intpAsessionB.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB"); + intpGroup.get("sessionB").add(intpBsessionB); + intpBsessionB.setInterpreterGroup(intpGroup); + + intpAsessionB.open(); + intpBsessionB.open(); + + assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler()); + assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler()); + } + + @Test + public void should_push_local_angular_repo_to_remote() throws Exception { + //Given + final Client client = Mockito.mock(Client.class); + final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId", + MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null, + null, "anonymous", false); + final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null); + registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId"); + final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId"); + interpreterGroup.setAngularObjectRegistry(registry); + intr.setInterpreterGroup(interpreterGroup); + + final java.lang.reflect.Type registryType = new TypeToken<Map<String, + Map<String, AngularObject>>>() {}.getType(); + final Gson gson = new Gson(); + final String expected = gson.toJson(registry.getRegistry(), registryType); + + //When + intr.pushAngularObjectRegistryToRemote(client); + + //Then + Mockito.verify(client).angularRegistryPush(expected); + } + + @Test + public void testEnvStringPattern() { + assertFalse(RemoteInterpreterUtils.isEnvString(null)); + assertFalse(RemoteInterpreterUtils.isEnvString("")); + assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF")); + assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123")); + } + + @Test + public void testEnvronmentAndPropertySet() { + Properties p = new Properties(); + p.setProperty("MY_ENV1", "env value 1"); + p.setProperty("my.property.1", "property value 1"); + + RemoteInterpreter intp = new RemoteInterpreter( + p, + "note", + MockInterpreterEnv.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intp); + intp.setInterpreterGroup(intpGroup); + + intp.open(); + + InterpreterContext context = new InterpreterContext( + "noteId", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + + assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData()); + assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code()); + assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code()); + assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData()); + + intp.close(); + } + + @Test + public void testSetProgress() throws InterruptedException { + // given MockInterpreterA set progress through InterpreterContext + Properties p = new Properties(); + p.setProperty("progress", "50"); + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + final InterpreterContext context1 = new InterpreterContext( + "noteId", + "id1", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + InterpreterContext context2 = new InterpreterContext( + "noteId", + "id2", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + + assertEquals(0, intpA.getProgress(context1)); + assertEquals(0, intpA.getProgress(context2)); + + // when interpreter update progress through InterpreterContext + Thread t = new Thread() { + public void run() { + InterpreterResult ret = intpA.interpret("1000", context1); + } + }; + t.start(); + + // then progress need to be updated in given context + while(intpA.getProgress(context1) == 0) Thread.yield(); + assertEquals(50, intpA.getProgress(context1)); + assertEquals(0, intpA.getProgress(context2)); + + t.join(); + assertEquals(0, intpA.getProgress(context1)); + assertEquals(0, intpA.getProgress(context2)); + } + +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java new file mode 100644 index 0000000..975d6ea --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.junit.Test; + +public class RemoteInterpreterUtilsTest { + + @Test + public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { + assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java new file mode 100644 index 0000000..50d9888 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +public class MockInterpreterA extends Interpreter { + + private String lastSt; + + public MockInterpreterA(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + public String getLastStatement() { + return lastSt; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + if (property.containsKey("progress")) { + context.setProgress(Integer.parseInt(getProperty("progress"))); + } + try { + Thread.sleep(Long.parseLong(st)); + this.lastSt = st; + } catch (NumberFormatException | InterruptedException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(Code.SUCCESS, st); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + if (getProperty("parallel") != null && getProperty("parallel").equals("true")) { + return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10); + } else { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java new file mode 100644 index 0000000..d4b26ad --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectWatcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; + +public class MockInterpreterAngular extends Interpreter { + + AtomicInteger numWatch = new AtomicInteger(0); + + public MockInterpreterAngular(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] stmt = st.split(" "); + String cmd = stmt[0]; + String name = null; + if (stmt.length >= 2) { + name = stmt[1]; + } + String value = null; + if (stmt.length == 3) { + value = stmt[2]; + } + + AngularObjectRegistry registry = context.getAngularObjectRegistry(); + + if (cmd.equals("add")) { + registry.add(name, value, context.getNoteId(), null); + registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher + (null) { + + @Override + public void watch(Object oldObject, Object newObject, + InterpreterContext context) { + numWatch.incrementAndGet(); + } + + }); + } else if (cmd.equalsIgnoreCase("update")) { + registry.get(name, context.getNoteId(), null).set(value); + } else if (cmd.equals("remove")) { + registry.remove(name, context.getNoteId(), null); + } + + try { + Thread.sleep(500); // wait for watcher executed + } catch (InterruptedException e) { + logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e); + } + + String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch + .get()); + return new InterpreterResult(Code.SUCCESS, msg); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java new file mode 100644 index 0000000..7103335 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.WrappedInterpreter; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; + +public class MockInterpreterB extends Interpreter { + + public MockInterpreterB(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + MockInterpreterA intpA = getInterpreterA(); + String intpASt = intpA.getLastStatement(); + long timeToSleep = Long.parseLong(st); + if (intpASt != null) { + timeToSleep += Long.parseLong(intpASt); + } + try { + Thread.sleep(timeToSleep); + } catch (NumberFormatException | InterruptedException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep)); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + public MockInterpreterA getInterpreterA() { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + synchronized (interpreterGroup) { + for (List<Interpreter> interpreters : interpreterGroup.values()) { + boolean belongsToSameNoteGroup = false; + MockInterpreterA a = null; + for (Interpreter intp : interpreters) { + if (intp.getClassName().equals(MockInterpreterA.class.getName())) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + a = (MockInterpreterA) p; + } + + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + if (this == p) { + belongsToSameNoteGroup = true; + } + } + if (belongsToSameNoteGroup) { + return a; + } + } + } + return null; + } + + @Override + public Scheduler getScheduler() { + MockInterpreterA intpA = getInterpreterA(); + if (intpA != null) { + return intpA.getScheduler(); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java new file mode 100644 index 0000000..12e11f7 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.util.List; +import java.util.Properties; + + +public class MockInterpreterEnv extends Interpreter { + + public MockInterpreterEnv(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] cmd = st.split(" "); + if (cmd[0].equals("getEnv")) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1])); + } else if (cmd[0].equals("getProperty")){ + return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1])); + } else { + return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]); + } + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } +} + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java new file mode 100644 index 0000000..349315c --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * MockInterpreter to test outputstream + */ +public class MockInterpreterOutputStream extends Interpreter { + private String lastSt; + + public MockInterpreterOutputStream(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + public String getLastStatement() { + return lastSt; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] ret = st.split(":"); + try { + if (ret[1] != null) { + context.out.write(ret[1]); + } + } catch (IOException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ? + ret[2] : ""); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java new file mode 100644 index 0000000..c4ff6ab --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.gson.Gson; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectWatcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePool; + +public class MockInterpreterResourcePool extends Interpreter { + + AtomicInteger numWatch = new AtomicInteger(0); + + public MockInterpreterResourcePool(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] stmt = st.split(" "); + String cmd = stmt[0]; + String noteId = null; + String paragraphId = null; + String name = null; + if (stmt.length >= 2) { + String[] npn = stmt[1].split(":"); + if (npn.length >= 3) { + noteId = npn[0]; + paragraphId = npn[1]; + name = npn[2]; + } else { + name = stmt[1]; + } + } + String value = null; + if (stmt.length >= 3) { + value = stmt[2]; + } + + ResourcePool resourcePool = context.getResourcePool(); + Object ret = null; + if (cmd.equals("put")) { + resourcePool.put(noteId, paragraphId, name, value); + } else if (cmd.equalsIgnoreCase("get")) { + Resource resource = resourcePool.get(noteId, paragraphId, name); + if (resource != null) { + ret = resourcePool.get(noteId, paragraphId, name).get(); + } else { + ret = ""; + } + } else if (cmd.equals("remove")) { + ret = resourcePool.remove(noteId, paragraphId, name); + } else if (cmd.equals("getAll")) { + ret = resourcePool.getAll(); + } else if (cmd.equals("invoke")) { + Resource resource = resourcePool.get(noteId, paragraphId, name); + if (stmt.length >=4) { + Resource res = resource.invokeMethod(value, null, null, stmt[3]); + ret = res.get(); + } else { + ret = resource.invokeMethod(value, null, null); + } + } + + try { + Thread.sleep(500); // wait for watcher executed + } catch (InterruptedException e) { + } + + Gson gson = new Gson(); + return new InterpreterResult(Code.SUCCESS, gson.toJson(ret)); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java new file mode 100644 index 0000000..5632513 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterFactory; +import org.apache.zeppelin.interpreter.InterpreterInfo; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.DefaultInterpreterProperty; +import org.apache.zeppelin.interpreter.InterpreterProperty; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.mock.MockInterpreter1; +import org.apache.zeppelin.interpreter.mock.MockInterpreter11; +import org.apache.zeppelin.interpreter.mock.MockInterpreter2; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class NoteInterpreterLoaderTest { + + private File tmpDir; + private ZeppelinConfiguration conf; + private InterpreterFactory factory; + private InterpreterSettingManager interpreterSettingManager; + private DependencyResolver depResolver; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); + tmpDir.mkdirs(); + new File(tmpDir, "conf").mkdirs(); + + System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); + + conf = ZeppelinConfiguration.create(); + + depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); + + ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, Maps.<String, Object>newHashMap())); + interpreterInfos.add(new InterpreterInfo(MockInterpreter11.class.getName(), "mock11", false, Maps.<String, Object>newHashMap())); + ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); + interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, Maps.<String, Object>newHashMap())); + + interpreterSettingManager.add("group1", interpreterInfos, Lists.<Dependency>newArrayList(), new InterpreterOption(), Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock", null); + interpreterSettingManager.add("group2", interpreterInfos2, Lists.<Dependency>newArrayList(), new InterpreterOption(), Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock", null); + + interpreterSettingManager.createNewSetting("group1", "group1", Lists.<Dependency>newArrayList(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); + interpreterSettingManager.createNewSetting("group2", "group2", Lists.<Dependency>newArrayList(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); + + + } + + @After + public void tearDown() throws Exception { + delete(tmpDir); + Interpreter.registeredInterpreters.clear(); + } + + @Test + public void testGetInterpreter() throws IOException { + interpreterSettingManager.setInterpreters("user", "note", interpreterSettingManager.getDefaultInterpreterSettingList()); + + // when there're no interpreter selection directive + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("user", "note", null).getClassName()); + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("user", "note", "").getClassName()); + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("user", "note", " ").getClassName()); + + // when group name is omitted + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", factory.getInterpreter("user", "note", "mock11").getClassName()); + + // when 'name' is ommitted + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("user", "note", "group1").getClassName()); + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", factory.getInterpreter("user", "note", "group2").getClassName()); + + // when nothing is ommitted + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("user", "note", "group1.mock1").getClassName()); + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", factory.getInterpreter("user", "note", "group1.mock11").getClassName()); + assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", factory.getInterpreter("user", "note", "group2.mock2").getClassName()); + + interpreterSettingManager.closeNote("user", "note"); + } + + @Test + public void testNoteSession() throws IOException { + interpreterSettingManager.setInterpreters("user", "noteA", interpreterSettingManager.getDefaultInterpreterSettingList()); + interpreterSettingManager.getInterpreterSettings("noteA").get(0).getOption().setPerNote(InterpreterOption.SCOPED); + + interpreterSettingManager.setInterpreters("user", "noteB", interpreterSettingManager.getDefaultInterpreterSettingList()); + interpreterSettingManager.getInterpreterSettings("noteB").get(0).getOption().setPerNote(InterpreterOption.SCOPED); + + // interpreters are not created before accessing it + assertNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA")); + assertNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB")); + + factory.getInterpreter("user", "noteA", null).open(); + factory.getInterpreter("user", "noteB", null).open(); + + assertTrue( + factory.getInterpreter("user", "noteA", null).getInterpreterGroup().getId().equals( + factory.getInterpreter("user", "noteB", null).getInterpreterGroup().getId())); + + // interpreters are created after accessing it + assertNotNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA")); + assertNotNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB")); + + // invalid close + interpreterSettingManager.closeNote("user", "note"); + assertNotNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "shared_process").get("noteA")); + assertNotNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "shared_process").get("noteB")); + + // when + interpreterSettingManager.closeNote("user", "noteA"); + interpreterSettingManager.closeNote("user", "noteB"); + + // interpreters are destroyed after close + assertNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "shared_process").get("noteA")); + assertNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "shared_process").get("noteB")); + + } + + @Test + public void testNotePerInterpreterProcess() throws IOException { + interpreterSettingManager.setInterpreters("user", "noteA", interpreterSettingManager.getDefaultInterpreterSettingList()); + interpreterSettingManager.getInterpreterSettings("noteA").get(0).getOption().setPerNote(InterpreterOption.ISOLATED); + + interpreterSettingManager.setInterpreters("user", "noteB", interpreterSettingManager.getDefaultInterpreterSettingList()); + interpreterSettingManager.getInterpreterSettings("noteB").get(0).getOption().setPerNote(InterpreterOption.ISOLATED); + + // interpreters are not created before accessing it + assertNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session")); + assertNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session")); + + factory.getInterpreter("user", "noteA", null).open(); + factory.getInterpreter("user", "noteB", null).open(); + + // per note interpreter process + assertFalse( + factory.getInterpreter("user", "noteA", null).getInterpreterGroup().getId().equals( + factory.getInterpreter("user", "noteB", null).getInterpreterGroup().getId())); + + // interpreters are created after accessing it + assertNotNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session")); + assertNotNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session")); + + // when + interpreterSettingManager.closeNote("user", "noteA"); + interpreterSettingManager.closeNote("user", "noteB"); + + // interpreters are destroyed after close + assertNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session")); + assertNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session")); + } + + @Test + public void testNoteInterpreterCloseForAll() throws IOException { + interpreterSettingManager.setInterpreters("user", "FitstNote", interpreterSettingManager.getDefaultInterpreterSettingList()); + interpreterSettingManager.getInterpreterSettings("FitstNote").get(0).getOption().setPerNote(InterpreterOption.SCOPED); + + interpreterSettingManager.setInterpreters("user", "yourFirstNote", interpreterSettingManager.getDefaultInterpreterSettingList()); + interpreterSettingManager.getInterpreterSettings("yourFirstNote").get(0).getOption().setPerNote(InterpreterOption.ISOLATED); + + // interpreters are not created before accessing it + assertNull(interpreterSettingManager.getInterpreterSettings("FitstNote").get(0).getInterpreterGroup("user", "FitstNote").get("FitstNote")); + assertNull(interpreterSettingManager.getInterpreterSettings("yourFirstNote").get(0).getInterpreterGroup("user", "yourFirstNote").get("yourFirstNote")); + + Interpreter firstNoteIntp = factory.getInterpreter("user", "FitstNote", "group1.mock1"); + Interpreter yourFirstNoteIntp = factory.getInterpreter("user", "yourFirstNote", "group1.mock1"); + + firstNoteIntp.open(); + yourFirstNoteIntp.open(); + + assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen()); + assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen()); + + interpreterSettingManager.closeNote("user", "FitstNote"); + + assertFalse(((LazyOpenInterpreter)firstNoteIntp).isOpen()); + assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen()); + + //reopen + firstNoteIntp.open(); + + assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen()); + assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen()); + + // invalid check + interpreterSettingManager.closeNote("invalid", "Note"); + + assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen()); + assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen()); + + // invalid contains value check + interpreterSettingManager.closeNote("u", "Note"); + + assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen()); + assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen()); + } + + + private void delete(File file){ + if(file.isFile()) file.delete(); + else if(file.isDirectory()){ + File [] files = file.listFiles(); + if(files!=null && files.length>0){ + for(File f : files){ + delete(f); + } + } + file.delete(); + } + } +}