http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java new file mode 100644 index 0000000..21d7526 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java @@ -0,0 +1,74 @@ +package org.apache.zeppelin.interpreter; + +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +import static org.mockito.Mockito.mock; + + +/** + * This class will load configuration files under + * src/test/resources/interpreter + * src/test/resources/conf + * + * to construct InterpreterSettingManager and InterpreterFactory properly + * + */ +public abstract class AbstractInterpreterTest { + protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractInterpreterTest.class); + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + protected InterpreterSettingManager interpreterSettingManager; + protected InterpreterFactory interpreterFactory; + protected File testRootDir; + protected File interpreterDir; + protected File confDir; + protected File notebookDir; + protected ZeppelinConfiguration conf; + + @Before + public void setUp() throws Exception { + // copy the resources files to a temp folder + testRootDir = new File(System.getProperty("java.io.tmpdir") + "/Zeppelin_Test_" + System.currentTimeMillis()); + testRootDir.mkdirs(); + LOGGER.info("Create tmp directory: {} as root folder of ZEPPELIN_INTERPRETER_DIR & ZEPPELIN_CONF_DIR", testRootDir.getAbsolutePath()); + interpreterDir = new File(testRootDir, "interpreter"); + confDir = new File(testRootDir, "conf"); + notebookDir = new File(testRootDir, "notebook"); + + interpreterDir.mkdirs(); + confDir.mkdirs(); + notebookDir.mkdirs(); + + FileUtils.copyDirectory(new File("src/test/resources/interpreter"), interpreterDir); + FileUtils.copyDirectory(new File("src/test/resources/conf"), confDir); + + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), INTERPRETER_SCRIPT); + + conf = new ZeppelinConfiguration(); + interpreterSettingManager = new InterpreterSettingManager(conf, + mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class)); + interpreterFactory = new InterpreterFactory(interpreterSettingManager); + } + + @After + public void tearDown() throws Exception { + interpreterSettingManager.close(); + FileUtils.deleteDirectory(testRootDir); + } +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DoubleEchoInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DoubleEchoInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DoubleEchoInterpreter.java new file mode 100644 index 0000000..8eea4b2 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DoubleEchoInterpreter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter; + +import java.util.Properties; + +/** + * + */ +public class DoubleEchoInterpreter extends Interpreter { + + public DoubleEchoInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, st + "," + st); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return null; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java deleted file mode 100644 index a7a6eb9..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.apache.zeppelin.interpreter; - -import java.util.Properties; - -/** - * - */ -public class DummyInterpreter extends Interpreter { - - public DummyInterpreter(Properties property) { - super(property); - } - - @Override - public void open() { - - } - - @Override - public void close() { - - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - return null; - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return null; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/EchoInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/EchoInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/EchoInterpreter.java new file mode 100644 index 0000000..e7a04f3 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/EchoInterpreter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter; + +import java.util.Properties; + +/** + * Just return the received statement back + */ +public class EchoInterpreter extends Interpreter { + + public EchoInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + if (Boolean.parseBoolean(property.getProperty("zeppelin.interpreter.echo.fail", "false"))) { + return new InterpreterResult(InterpreterResult.Code.ERROR); + } else { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, st); + } + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java new file mode 100644 index 0000000..f3137d9 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class InterpreterFactoryTest extends AbstractInterpreterTest { + + @Test + public void testGetFactory() throws IOException { + // no default interpreter because there's no interpreter setting binded to this note + assertNull(interpreterFactory.getInterpreter("user1", "note1", "")); + + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "") instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", ""); + // EchoInterpreter is the default interpreter (see zeppelin-interpreter/src/test/resources/conf/interpreter.json) + assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName()); + + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test") instanceof RemoteInterpreter); + remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test"); + assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName()); + + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "echo") instanceof RemoteInterpreter); + remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "echo"); + assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName()); + + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "double_echo") instanceof RemoteInterpreter); + remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "double_echo"); + assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName()); + } + + @Test(expected = InterpreterException.class) + public void testUnknownRepl1() throws IOException { + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl"); + } + + @Test + public void testUnknownRepl2() throws IOException { + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + assertNull(interpreterFactory.getInterpreter("user1", "note1", "unknown_repl")); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterGroupTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterGroupTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterGroupTest.java new file mode 100644 index 0000000..11607bb --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterGroupTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sonatype.aether.RepositoryException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; + + +public class InterpreterGroupTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterGroupTest.class); + + private InterpreterSetting interpreterSetting; + + @Before + public void setUp() throws IOException, RepositoryException { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerUser(InterpreterOption.SCOPED); + interpreterOption.setRemote(false); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + } + + @Test + public void testInterpreterGroup() { + InterpreterGroup interpreterGroup = new InterpreterGroup("group_1", interpreterSetting); + assertEquals(0, interpreterGroup.getSessionNum()); + + // create session_1 + List<Interpreter> interpreters = interpreterGroup.getOrCreateSession("user1", "session_1"); + assertEquals(2, interpreters.size()); + assertEquals(EchoInterpreter.class.getName(), interpreters.get(0).getClassName()); + assertEquals(DoubleEchoInterpreter.class.getName(), interpreters.get(1).getClassName()); + assertEquals(1, interpreterGroup.getSessionNum()); + + // get the same interpreters when interpreterGroup.getOrCreateSession is invoked again + assertEquals(interpreters, interpreterGroup.getOrCreateSession("user1", "session_1")); + assertEquals(1, interpreterGroup.getSessionNum()); + + // create session_2 + List<Interpreter> interpreters2 = interpreterGroup.getOrCreateSession("user1", "session_2"); + assertEquals(2, interpreters2.size()); + assertEquals(EchoInterpreter.class.getName(), interpreters2.get(0).getClassName()); + assertEquals(DoubleEchoInterpreter.class.getName(), interpreters2.get(1).getClassName()); + assertEquals(2, interpreterGroup.getSessionNum()); + + // close session_1 + interpreterGroup.close("session_1"); + assertEquals(1, interpreterGroup.getSessionNum()); + + // close InterpreterGroup + interpreterGroup.close(); + assertEquals(0, interpreterGroup.getSessionNum()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java index e376809..f3a30fb 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.*; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; @@ -29,7 +30,7 @@ import org.junit.Test; public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener { private File tmpDir; private File fileChanged; - private int numChanged; + private AtomicInteger numChanged; private InterpreterOutputChangeWatcher watcher; @Before @@ -40,7 +41,7 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); tmpDir.mkdirs(); fileChanged = null; - numChanged = 0; + numChanged = new AtomicInteger(0); } @After @@ -66,7 +67,7 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan @Test public void test() throws IOException, InterruptedException { assertNull(fileChanged); - assertEquals(0, numChanged); + assertEquals(0, numChanged.get()); Thread.sleep(1000); // create new file @@ -92,14 +93,14 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan } assertNotNull(fileChanged); - assertEquals(1, numChanged); + assertEquals(1, numChanged.get()); } @Override public void fileChanged(File file) { fileChanged = file; - numChanged++; + numChanged.incrementAndGet(); synchronized(this) { notify(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java new file mode 100644 index 0000000..c74760f --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.junit.Test; +import org.sonatype.aether.RepositoryException; +import org.sonatype.aether.repository.RemoteRepository; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + + +public class InterpreterSettingManagerTest extends AbstractInterpreterTest { + + @Test + public void testInitInterpreterSettingManager() throws IOException, RepositoryException { + assertEquals(2, interpreterSettingManager.get().size()); + InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test"); + assertEquals("test", interpreterSetting.getName()); + assertEquals("test", interpreterSetting.getGroup()); + assertEquals(2, interpreterSetting.getInterpreterInfos().size()); + // 3 other builtin properties: + // * zeppelin.interpeter.output.limit + // * zeppelin.interpreter.localRepo + // * zeppelin.interpreter.max.poolsize + assertEquals(6, interpreterSetting.getJavaProperties().size()); + assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1")); + assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2")); + assertEquals("value_3", interpreterSetting.getJavaProperties().getProperty("property_3")); + assertEquals("shared", interpreterSetting.getOption().perNote); + assertEquals("shared", interpreterSetting.getOption().perUser); + assertEquals(0, interpreterSetting.getDependencies().size()); + assertNotNull(interpreterSetting.getAngularObjectRegistryListener()); + assertNotNull(interpreterSetting.getRemoteInterpreterProcessListener()); + assertNotNull(interpreterSetting.getAppEventListener()); + assertNotNull(interpreterSetting.getDependencyResolver()); + assertNotNull(interpreterSetting.getInterpreterSettingManager()); + + List<RemoteRepository> repositories = interpreterSettingManager.getRepositories(); + assertEquals(2, repositories.size()); + assertEquals("central", repositories.get(0).getId()); + + // Load it again + InterpreterSettingManager interpreterSettingManager2 = new InterpreterSettingManager(conf, + mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class)); + assertEquals(2, interpreterSettingManager2.get().size()); + interpreterSetting = interpreterSettingManager2.getByName("test"); + assertEquals("test", interpreterSetting.getName()); + assertEquals("test", interpreterSetting.getGroup()); + assertEquals(2, interpreterSetting.getInterpreterInfos().size()); + assertEquals(6, interpreterSetting.getJavaProperties().size()); + assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1")); + assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2")); + assertEquals("value_3", interpreterSetting.getJavaProperties().getProperty("property_3")); + assertEquals("shared", interpreterSetting.getOption().perNote); + assertEquals("shared", interpreterSetting.getOption().perUser); + assertEquals(0, interpreterSetting.getDependencies().size()); + + repositories = interpreterSettingManager2.getRepositories(); + assertEquals(2, repositories.size()); + assertEquals("central", repositories.get(0).getId()); + + } + + @Test + public void testCreateUpdateRemoveSetting() throws IOException { + // create new interpreter setting + InterpreterOption option = new InterpreterOption(); + option.setPerNote("scoped"); + option.setPerUser("scoped"); + Map<String, InterpreterProperty> properties = new HashMap<>(); + properties.put("property_4", new InterpreterProperty("property_4","value_4")); + + try { + interpreterSettingManager.createNewSetting("test2", "test", new ArrayList<Dependency>(), option, properties); + fail("Should fail due to interpreter already existed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("already existed")); + } + + interpreterSettingManager.createNewSetting("test3", "test", new ArrayList<Dependency>(), option, properties); + assertEquals(3, interpreterSettingManager.get().size()); + InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test3"); + assertEquals("test3", interpreterSetting.getName()); + assertEquals("test", interpreterSetting.getGroup()); + // 3 other builtin properties: + // * zeppelin.interpeter.output.limit + // * zeppelin.interpreter.localRepo + // * zeppelin.interpreter.max.poolsize + assertEquals(4, interpreterSetting.getJavaProperties().size()); + assertEquals("value_4", interpreterSetting.getJavaProperties().getProperty("property_4")); + assertEquals("scoped", interpreterSetting.getOption().perNote); + assertEquals("scoped", interpreterSetting.getOption().perUser); + assertEquals(0, interpreterSetting.getDependencies().size()); + assertNotNull(interpreterSetting.getAngularObjectRegistryListener()); + assertNotNull(interpreterSetting.getRemoteInterpreterProcessListener()); + assertNotNull(interpreterSetting.getAppEventListener()); + assertNotNull(interpreterSetting.getDependencyResolver()); + assertNotNull(interpreterSetting.getInterpreterSettingManager()); + + // load it again, it should be saved in interpreter-setting.json. So we can restore it properly + InterpreterSettingManager interpreterSettingManager2 = new InterpreterSettingManager(conf, + mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class)); + assertEquals(3, interpreterSettingManager2.get().size()); + interpreterSetting = interpreterSettingManager2.getByName("test3"); + assertEquals("test3", interpreterSetting.getName()); + assertEquals("test", interpreterSetting.getGroup()); + assertEquals(6, interpreterSetting.getJavaProperties().size()); + assertEquals("value_4", interpreterSetting.getJavaProperties().getProperty("property_4")); + assertEquals("scoped", interpreterSetting.getOption().perNote); + assertEquals("scoped", interpreterSetting.getOption().perUser); + assertEquals(0, interpreterSetting.getDependencies().size()); + + // update interpreter setting + InterpreterOption newOption = new InterpreterOption(); + newOption.setPerNote("scoped"); + newOption.setPerUser("isolated"); + Map<String, InterpreterProperty> newProperties = new HashMap<>(properties); + newProperties.put("property_4", new InterpreterProperty("property_4", "new_value_4")); + List<Dependency> newDependencies = new ArrayList<>(); + newDependencies.add(new Dependency("com.databricks:spark-avro_2.11:3.1.0")); + interpreterSettingManager.setPropertyAndRestart(interpreterSetting.getId(), newOption, newProperties, newDependencies); + interpreterSetting = interpreterSettingManager.get(interpreterSetting.getId()); + assertEquals("test3", interpreterSetting.getName()); + assertEquals("test", interpreterSetting.getGroup()); + assertEquals(4, interpreterSetting.getJavaProperties().size()); + assertEquals("new_value_4", interpreterSetting.getJavaProperties().getProperty("property_4")); + assertEquals("scoped", interpreterSetting.getOption().perNote); + assertEquals("isolated", interpreterSetting.getOption().perUser); + assertEquals(1, interpreterSetting.getDependencies().size()); + assertNotNull(interpreterSetting.getAngularObjectRegistryListener()); + assertNotNull(interpreterSetting.getRemoteInterpreterProcessListener()); + assertNotNull(interpreterSetting.getAppEventListener()); + assertNotNull(interpreterSetting.getDependencyResolver()); + assertNotNull(interpreterSetting.getInterpreterSettingManager()); + + // restart in note page + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + interpreterSettingManager.setInterpreterBinding("user2", "note2", interpreterSettingManager.getSettingIds()); + interpreterSettingManager.setInterpreterBinding("user3", "note3", interpreterSettingManager.getSettingIds()); + // create 3 sessions as it is scoped mode + interpreterSetting.getOption().setPerUser("scoped"); + interpreterSetting.getDefaultInterpreter("user1", "note1"); + interpreterSetting.getDefaultInterpreter("user2", "note2"); + interpreterSetting.getDefaultInterpreter("user3", "note3"); + InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1"); + assertEquals(3, interpreterGroup.getSessionNum()); + // only close user1's session + interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1"); + assertEquals(2, interpreterGroup.getSessionNum()); + // close all the sessions + interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "anonymous"); + assertEquals(0, interpreterGroup.getSessionNum()); + + // remove interpreter setting + interpreterSettingManager.remove(interpreterSetting.getId()); + assertEquals(2, interpreterSettingManager.get().size()); + + // load it again + InterpreterSettingManager interpreterSettingManager3 = new InterpreterSettingManager(new ZeppelinConfiguration(), + mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class)); + assertEquals(2, interpreterSettingManager3.get().size()); + + } + + @Test + public void testInterpreterBinding() throws IOException { + assertNull(interpreterSettingManager.getInterpreterBinding("note1")); + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds()); + assertEquals(interpreterSettingManager.getInterpreterSettingIds(), interpreterSettingManager.getInterpreterBinding("note1")); + } + + @Test + public void testUpdateInterpreterBinding_PerNoteShared() throws IOException { + InterpreterSetting defaultInterpreterSetting = interpreterSettingManager.get().get(0); + defaultInterpreterSetting.getOption().setPerNote("shared"); + + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds()); + // create interpreter of the first binded interpreter setting + interpreterFactory.getInterpreter("user1", "note1", ""); + assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size()); + + // choose the first setting + List<String> newSettingIds = new ArrayList<>(); + newSettingIds.add(interpreterSettingManager.getInterpreterSettingIds().get(1)); + + interpreterSettingManager.setInterpreterBinding("user1", "note1", newSettingIds); + assertEquals(newSettingIds, interpreterSettingManager.getInterpreterBinding("note1")); + // InterpreterGroup will still be alive as it is shared + assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size()); + } + + @Test + public void testUpdateInterpreterBinding_PerNoteIsolated() throws IOException { + InterpreterSetting defaultInterpreterSetting = interpreterSettingManager.get().get(0); + defaultInterpreterSetting.getOption().setPerNote("isolated"); + + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds()); + // create interpreter of the first binded interpreter setting + interpreterFactory.getInterpreter("user1", "note1", ""); + assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size()); + + // choose the first setting + List<String> newSettingIds = new ArrayList<>(); + newSettingIds.add(interpreterSettingManager.getInterpreterSettingIds().get(1)); + + interpreterSettingManager.setInterpreterBinding("user1", "note1", newSettingIds); + assertEquals(newSettingIds, interpreterSettingManager.getInterpreterBinding("note1")); + // InterpreterGroup will be closed as it is only belong to this note + assertEquals(0, defaultInterpreterSetting.getAllInterpreterGroups().size()); + + } + + @Test + public void testUpdateInterpreterBinding_PerNoteScoped() throws IOException { + InterpreterSetting defaultInterpreterSetting = interpreterSettingManager.get().get(0); + defaultInterpreterSetting.getOption().setPerNote("scoped"); + + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreterBinding("user1", "note2", interpreterSettingManager.getInterpreterSettingIds()); + // create 2 interpreter of the first binded interpreter setting for note1 and note2 + interpreterFactory.getInterpreter("user1", "note1", ""); + interpreterFactory.getInterpreter("user1", "note2", ""); + assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size()); + assertEquals(2, defaultInterpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // choose the first setting + List<String> newSettingIds = new ArrayList<>(); + newSettingIds.add(interpreterSettingManager.getInterpreterSettingIds().get(1)); + + interpreterSettingManager.setInterpreterBinding("user1", "note1", newSettingIds); + assertEquals(newSettingIds, interpreterSettingManager.getInterpreterBinding("note1")); + // InterpreterGroup will be still alive but session belong to note1 will be closed + assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java new file mode 100644 index 0000000..3c061a9 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class InterpreterSettingTest { + + @Test + public void testCreateInterpreters() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerUser(InterpreterOption.SHARED); + interpreterOption.setRemote(false); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create default interpreter for user1 and note1 + assertEquals(EchoInterpreter.class.getName(), interpreterSetting.getDefaultInterpreter("user1", "note1").getClassName()); + + // create interpreter echo for user1 and note1 + assertEquals(EchoInterpreter.class.getName(), interpreterSetting.getInterpreter("user1", "note1", "echo").getClassName()); + assertEquals(interpreterSetting.getDefaultInterpreter("user1", "note1"), interpreterSetting.getInterpreter("user1", "note1", "echo")); + + // create interpreter double_echo for user1 and note1 + assertEquals(DoubleEchoInterpreter.class.getName(), interpreterSetting.getInterpreter("user1", "note1", "double_echo").getClassName()); + + // create non-existed interpreter + assertNull(interpreterSetting.getInterpreter("user1", "note1", "invalid_echo")); + } + + @Test + public void testSharedMode() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerUser(InterpreterOption.SHARED); + interpreterOption.setRemote(false); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create default interpreter for user1 and note1 + interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + + // create default interpreter for user2 and note1 + interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + + // create default interpreter user1 and note2 + interpreterSetting.getDefaultInterpreter("user1", "note2"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + + // only 1 session is created, this session is shared across users and notes + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + interpreterSetting.closeInterpreters("note1", "user1"); + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + } + + @Test + public void testPerUserScopedMode() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerUser(InterpreterOption.SCOPED); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create interpreter for user1 and note1 + interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // create interpreter for user2 and note1 + interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + interpreterSetting.closeInterpreters("user1", "note1"); + // InterpreterGroup is still there, but one session is removed + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + interpreterSetting.closeInterpreters("user2", "note1"); + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + } + + @Test + public void testPerNoteScopedMode() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerNote(InterpreterOption.SCOPED); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create interpreter for user1 and note1 + interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // create interpreter for user1 and note2 + interpreterSetting.getDefaultInterpreter("user1", "note2"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + interpreterSetting.closeInterpreters("user1", "note1"); + // InterpreterGroup is still there, but one session is removed + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + interpreterSetting.closeInterpreters("user1", "note2"); + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + } + + @Test + public void testPerUserIsolatedMode() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerUser(InterpreterOption.ISOLATED); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create interpreter for user1 and note1 + interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // create interpreter for user2 and note1 + interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().size()); + + // Each user own one InterpreterGroup and one session per InterpreterGroup + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(1).getSessionNum()); + + interpreterSetting.closeInterpreters("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + interpreterSetting.closeInterpreters("user2", "note1"); + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + } + + @Test + public void testPerNoteIsolatedMode() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerNote(InterpreterOption.ISOLATED); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create interpreter for user1 and note1 + interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // create interpreter for user2 and note2 + interpreterSetting.getDefaultInterpreter("user1", "note2"); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().size()); + // Each user own one InterpreterGroup and one session per InterpreterGroup + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(1).getSessionNum()); + + interpreterSetting.closeInterpreters("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + interpreterSetting.closeInterpreters("user1", "note2"); + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + } + + @Test + public void testPerUserIsolatedPerNoteScopedMode() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerUser(InterpreterOption.ISOLATED); + interpreterOption.setPerNote(InterpreterOption.SCOPED); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create interpreter for user1 and note1 + interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + interpreterSetting.getDefaultInterpreter("user1", "note2"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // create interpreter for user2 and note1 + interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().size()); + + // group1 for user1 has 2 sessions, and group2 for user2 has 1 session + assertEquals(interpreterSetting.getInterpreterGroup("user1", "note1"), interpreterSetting.getInterpreterGroup("user1", "note2")); + assertEquals(2, interpreterSetting.getInterpreterGroup("user1", "note1").getSessionNum()); + assertEquals(2, interpreterSetting.getInterpreterGroup("user1", "note2").getSessionNum()); + assertEquals(1, interpreterSetting.getInterpreterGroup("user2", "note1").getSessionNum()); + + // close one session for user1 + interpreterSetting.closeInterpreters("user1", "note1"); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").getSessionNum()); + + // close another session for user1 + interpreterSetting.closeInterpreters("user1", "note2"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + + // close session for user2 + interpreterSetting.closeInterpreters("user2", "note1"); + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + } + + @Test + public void testPerUserIsolatedPerNoteIsolatedMode() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerUser(InterpreterOption.ISOLATED); + interpreterOption.setPerNote(InterpreterOption.ISOLATED); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create interpreter for user1 and note1 + interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + + // create interpreter for user1 and note2 + interpreterSetting.getDefaultInterpreter("user1", "note2"); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().size()); + + // create interpreter for user2 and note1 + interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertEquals(3, interpreterSetting.getAllInterpreterGroups().size()); + + // create interpreter for user2 and note2 + interpreterSetting.getDefaultInterpreter("user2", "note2"); + assertEquals(4, interpreterSetting.getAllInterpreterGroups().size()); + + for (InterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) { + // each InterpreterGroup has one session + assertEquals(1, interpreterGroup.getSessionNum()); + } + + // close one session for user1 and note1 + interpreterSetting.closeInterpreters("user1", "note1"); + assertEquals(3, interpreterSetting.getAllInterpreterGroups().size()); + + // close one session for user1 and note2 + interpreterSetting.closeInterpreters("user1", "note2"); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().size()); + + // close one session for user2 and note1 + interpreterSetting.closeInterpreters("user2", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + + // close one session for user2 and note2 + interpreterSetting.closeInterpreters("user2", "note2"); + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + } + + @Test + public void testPerUserScopedPerNoteScopedMode() { + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setPerUser(InterpreterOption.SCOPED); + interpreterOption.setPerNote(InterpreterOption.SCOPED); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + InterpreterSetting interpreterSetting = new InterpreterSetting.Builder() + .setId("id") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .create(); + + // create interpreter for user1 and note1 + interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // create interpreter for user1 and note2 + interpreterSetting.getDefaultInterpreter("user1", "note2"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // create interpreter for user2 and note1 + interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(3, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // create interpreter for user2 and note2 + interpreterSetting.getDefaultInterpreter("user2", "note2"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertEquals(4, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // close one session for user1 and note1 + interpreterSetting.closeInterpreters("user1", "note1"); + assertEquals(3, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // close one session for user1 and note2 + interpreterSetting.closeInterpreters("user1", "note2"); + assertEquals(2, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // close one session for user2 and note1 + interpreterSetting.closeInterpreters("user2", "note1"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); + + // close one session for user2 and note2 + interpreterSetting.closeInterpreters("user2", "note2"); + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java index 305268c..d46eaa7 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java @@ -24,13 +24,14 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +//TODO(zjffdu) add more test for Interpreter which is a very important class public class InterpreterTest { @Test public void testDefaultProperty() { Properties p = new Properties(); p.put("p1", "v1"); - Interpreter intp = new DummyInterpreter(p); + Interpreter intp = new EchoInterpreter(p); assertEquals(1, intp.getProperty().size()); assertEquals("v1", intp.getProperty().get("p1")); @@ -41,7 +42,7 @@ public class InterpreterTest { public void testOverriddenProperty() { Properties p = new Properties(); p.put("p1", "v1"); - Interpreter intp = new DummyInterpreter(p); + Interpreter intp = new EchoInterpreter(p); Properties overriddenProperty = new Properties(); overriddenProperty.put("p1", "v2"); intp.setProperty(overriddenProperty); @@ -73,7 +74,7 @@ public class InterpreterTest { Properties p = new Properties(); p.put("p1", "replName #{noteId}, #{paragraphTitle}, #{paragraphId}, #{paragraphText}, #{replName}, #{noteId}, #{user}," + " #{authenticationInfo}"); - Interpreter intp = new DummyInterpreter(p); + Interpreter intp = new EchoInterpreter(p); intp.setUserName(user); String actual = intp.getProperty("p1"); InterpreterContext.remove(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SleepInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SleepInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SleepInterpreter.java new file mode 100644 index 0000000..9deafcf --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SleepInterpreter.java @@ -0,0 +1,60 @@ +package org.apache.zeppelin.interpreter; + +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.util.Properties; + +/** + * Interpreter that only accept long value and sleep for such period + */ +public class SleepInterpreter extends Interpreter { + + public SleepInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + try { + Thread.sleep(Long.parseLong(st)); + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } catch (Exception e) { + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + } + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public Scheduler getScheduler() { + if (Boolean.parseBoolean(property.getProperty("zeppelin.SleepInterpreter.parallel", "false"))) { + return SchedulerFactory.singleton().createOrGetParallelScheduler( + "Parallel-" + SleepInterpreter.class.getName(), 10); + } + return super.getScheduler(); + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/install/InstallInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/install/InstallInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/install/InstallInterpreterTest.java new file mode 100644 index 0000000..e934f1a --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/install/InstallInterpreterTest.java @@ -0,0 +1,86 @@ +package org.apache.zeppelin.interpreter.install; + +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class InstallInterpreterTest { + private File tmpDir; + private InstallInterpreter installer; + private File interpreterBaseDir; + + @Before + public void setUp() throws IOException { + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); + new File(tmpDir, "conf").mkdirs(); + interpreterBaseDir = new File(tmpDir, "interpreter"); + File localRepoDir = new File(tmpDir, "local-repo"); + interpreterBaseDir.mkdir(); + localRepoDir.mkdir(); + + File interpreterListFile = new File(tmpDir, "conf/interpreter-list"); + + + // create interpreter list file + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); + + String interpreterList = ""; + interpreterList += "intp1 org.apache.commons:commons-csv:1.1 test interpreter 1\n"; + interpreterList += "intp2 org.apache.commons:commons-math3:3.6.1 test interpreter 2\n"; + + FileUtils.writeStringToFile(new File(tmpDir, "conf/interpreter-list"), interpreterList); + + installer = new InstallInterpreter(interpreterListFile, interpreterBaseDir, localRepoDir + .getAbsolutePath()); + } + + @After + public void tearDown() throws IOException { + FileUtils.deleteDirectory(tmpDir); + } + + + @Test + public void testList() { + assertEquals(2, installer.list().size()); + } + + @Test + public void install() { + assertEquals(0, interpreterBaseDir.listFiles().length); + + installer.install("intp1"); + assertTrue(new File(interpreterBaseDir, "intp1").isDirectory()); + } + + @Test + public void installAll() { + installer.installAll(); + assertTrue(new File(interpreterBaseDir, "intp1").isDirectory()); + assertTrue(new File(interpreterBaseDir, "intp2").isDirectory()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java new file mode 100644 index 0000000..a533c12 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java @@ -0,0 +1,106 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.zeppelin.interpreter.mock; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class MockInterpreter1 extends Interpreter { + + Map<String, Object> vars = new HashMap<>(); + + public MockInterpreter1(Properties property) { + super(property); + } + boolean open; + + + @Override + public void open() { + open = true; + } + + @Override + public void close() { + open = false; + } + + + public boolean isOpen() { + return open; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + InterpreterResult result; + + if ("getId".equals(st)) { + // get unique id of this interpreter instance + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + } else if (st.startsWith("sleep")) { + try { + Thread.sleep(Integer.parseInt(st.split(" ")[1])); + } catch (InterruptedException e) { + // nothing to do + } + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); + } else { + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); + } + + if (context.getResourcePool() != null) { + context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result); + } + + return result; + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode()); + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter11.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter11.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter11.java new file mode 100644 index 0000000..d53716f --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter11.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.mock; + + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class MockInterpreter11 extends Interpreter { + Map<String, Object> vars = new HashMap<>(); + + public MockInterpreter11(Properties property) { + super(property); + } + + boolean open; + + @Override + public void open() { + open = true; + } + + @Override + public void close() { + open = false; + } + + public boolean isOpen() { + return open; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl11: " + st); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_" + this.hashCode()); + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java new file mode 100644 index 0000000..f36df56 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.mock; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class MockInterpreter2 extends Interpreter{ + Map<String, Object> vars = new HashMap<>(); + + public MockInterpreter2(Properties property) { + super(property); + } + + boolean open; + + @Override + public void open() { + open = true; + } + + @Override + public void close() { + open = false; + } + + public boolean isOpen() { + return open; + } + + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + InterpreterResult result; + + if ("getId".equals(st)) { + // get unique id of this interpreter instance + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + } else if (st.startsWith("sleep")) { + try { + Thread.sleep(Integer.parseInt(st.split(" ")[1])); + } catch (InterruptedException e) { + // nothing to do + } + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st); + } else { + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st); + } + + if (context.getResourcePool() != null) { + context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result); + } + return result; + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode()); + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java new file mode 100644 index 0000000..c9dc5c0 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.*; + +public class AppendOutputRunnerTest { + + private static final int NUM_EVENTS = 10000; + private static final int NUM_CLUBBED_EVENTS = 100; + private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + private static ScheduledFuture<?> future = null; + /* It is being accessed by multiple threads. + * While loop for 'loopForBufferCompletion' could + * run for-ever. + */ + private volatile static int numInvocations = 0; + + @After + public void afterEach() { + if (future != null) { + future.cancel(true); + } + } + + @Test + public void testSingleEvent() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String[][] buffer = {{"note", "para", "data\n"}}; + + loopForCompletingEvents(listener, 1, buffer); + verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n"); + } + + @Test + public void testMultipleEventsOfSameParagraph() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String note1 = "note1"; + String para1 = "para1"; + String[][] buffer = { + {note1, para1, "data1\n"}, + {note1, para1, "data2\n"}, + {note1, para1, "data3\n"} + }; + + loopForCompletingEvents(listener, 1, buffer); + verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n"); + } + + @Test + public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String note1 = "note1"; + String note2 = "note2"; + String para1 = "para1"; + String para2 = "para2"; + String[][] buffer = { + {note1, para1, "data1\n"}, + {note1, para2, "data2\n"}, + {note2, para1, "data3\n"}, + {note2, para2, "data4\n"} + }; + loopForCompletingEvents(listener, 4, buffer); + + verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n"); + verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n"); + verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n"); + verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n"); + } + + @Test + public void testClubbedData() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + AppendOutputRunner runner = new AppendOutputRunner(listener); + future = service.scheduleWithFixedDelay(runner, 0, + AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + Thread thread = new Thread(new BombardEvents(runner)); + thread.start(); + thread.join(); + Thread.sleep(1000); + + /* NUM_CLUBBED_EVENTS is a heuristic number. + * It has been observed that for 10,000 continuos event + * calls, 30-40 Web-socket calls are made. Keeping + * the unit-test to a pessimistic 100 web-socket calls. + */ + verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + } + + @Test + public void testWarnLoggerForLargeData() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + AppendOutputRunner runner = new AppendOutputRunner(listener); + String data = "data\n"; + int numEvents = 100000; + + for (int i=0; i<numEvents; i++) { + runner.appendBuffer("noteId", "paraId", 0, data); + } + + TestAppender appender = new TestAppender(); + Logger logger = Logger.getRootLogger(); + logger.addAppender(appender); + Logger.getLogger(RemoteInterpreterEventPoller.class); + + runner.run(); + List<LoggingEvent> log; + + int warnLogCounter; + LoggingEvent sizeWarnLogEntry = null; + do { + warnLogCounter = 0; + log = appender.getLog(); + for (LoggingEvent logEntry: log) { + if (Level.WARN.equals(logEntry.getLevel())) { + sizeWarnLogEntry = logEntry; + warnLogCounter += 1; + } + } + } while(warnLogCounter != 2); + + String loggerString = "Processing size for buffered append-output is high: " + + (data.length() * numEvents) + " characters."; + assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage())); + } + + private class BombardEvents implements Runnable { + + private final AppendOutputRunner runner; + + private BombardEvents(AppendOutputRunner runner) { + this.runner = runner; + } + + @Override + public void run() { + String noteId = "noteId"; + String paraId = "paraId"; + for (int i=0; i<NUM_EVENTS; i++) { + runner.appendBuffer(noteId, paraId, 0, "data\n"); + } + } + } + + private class TestAppender extends AppenderSkeleton { + private final List<LoggingEvent> log = new ArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + protected void append(final LoggingEvent loggingEvent) { + log.add(loggingEvent); + } + + @Override + public void close() { + } + + public List<LoggingEvent> getLog() { + return new ArrayList<>(log); + } + } + + private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) { + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + numInvocations += 1; + return null; + } + }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + } + + private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, + int numTimes, String[][] buffer) { + numInvocations = 0; + prepareInvocationCounts(listener); + AppendOutputRunner runner = new AppendOutputRunner(listener); + for (String[] bufferElement: buffer) { + runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]); + } + future = service.scheduleWithFixedDelay(runner, 0, + AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + long startTimeMs = System.currentTimeMillis(); + while(numInvocations != numTimes) { + if (System.currentTimeMillis() - startTimeMs > 2000) { + fail("Buffered events were not sent for 2 seconds"); + } + } + } +} \ No newline at end of file