http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java new file mode 100644 index 0000000..61e4ef0 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class RemoteAngularObjectTest implements AngularObjectRegistryListener { + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + private RemoteInterpreter intp; + private InterpreterContext context; + private RemoteAngularObjectRegistry localRegistry; + private InterpreterSetting interpreterSetting; + + private AtomicInteger onAdd; + private AtomicInteger onUpdate; + private AtomicInteger onRemove; + + @Before + public void setUp() throws Exception { + onAdd = new AtomicInteger(0); + onUpdate = new AtomicInteger(0); + onRemove = new AtomicInteger(0); + + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterAngular.class.getName(), "mock", true, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT); + interpreterSetting = new InterpreterSetting.Builder() + .setId("test") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .setRunner(runner) + .setInterpreterDir("../interpeters/test") + .create(); + + intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); + localRegistry = (RemoteAngularObjectRegistry) intp.getInterpreterGroup().getAngularObjectRegistry(); + + context = new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intp.getInterpreterGroup().getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + intp.open(); + + } + + @After + public void tearDown() throws Exception { + interpreterSetting.close(); + } + + @Test + public void testAngularObjectInterpreterSideCRUD() throws InterruptedException { + InterpreterResult ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + String[] result = ret.message().get(0).getData().split(" "); + assertEquals("0", result[0]); // size of registry + assertEquals("0", result[1]); // num watcher called + + // create object + ret = intp.interpret("add n1 v1", context); + Thread.sleep(500); + result = ret.message().get(0).getData().split(" "); + assertEquals("1", result[0]); // size of registry + assertEquals("0", result[1]); // num watcher called + assertEquals("v1", localRegistry.get("n1", "note", null).get()); + + // update object + ret = intp.interpret("update n1 v11", context); + result = ret.message().get(0).getData().split(" "); + Thread.sleep(500); + assertEquals("1", result[0]); // size of registry + assertEquals("1", result[1]); // num watcher called + assertEquals("v11", localRegistry.get("n1", "note", null).get()); + + // remove object + ret = intp.interpret("remove n1", context); + result = ret.message().get(0).getData().split(" "); + Thread.sleep(500); + assertEquals("0", result[0]); // size of registry + assertEquals("1", result[1]); // num watcher called + assertEquals(null, localRegistry.get("n1", "note", null)); + } + + @Test + public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException { + // test if angularobject removal from server side propagate to interpreter process's registry. + // will happen when notebook is removed. + + InterpreterResult ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + String[] result = ret.message().get(0).getData().split(" "); + assertEquals("0", result[0]); // size of registry + + // create object + ret = intp.interpret("add n1 v1", context); + Thread.sleep(500); + result = ret.message().get(0).getData().split(" "); + assertEquals("1", result[0]); // size of registry + assertEquals("v1", localRegistry.get("n1", "note", null).get()); + + // remove object in local registry. + localRegistry.removeAndNotifyRemoteProcess("n1", "note", null); + ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + result = ret.message().get(0).getData().split(" "); + assertEquals("0", result[0]); // size of registry + } + + @Test + public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException { + // test if angularobject add from server side propagate to interpreter process's registry. + // will happen when zeppelin server loads notebook and restore the object into registry + + InterpreterResult ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + String[] result = ret.message().get(0).getData().split(" "); + assertEquals("0", result[0]); // size of registry + + // create object + localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null); + + // get from remote registry + ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + result = ret.message().get(0).getData().split(" "); + assertEquals("1", result[0]); // size of registry + } + + @Override + public void onAdd(String interpreterGroupId, AngularObject object) { + onAdd.incrementAndGet(); + } + + @Override + public void onUpdate(String interpreterGroupId, AngularObject object) { + onUpdate.incrementAndGet(); + } + + @Override + public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { + onRemove.incrementAndGet(); + } + +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java new file mode 100644 index 0000000..49aa7aa --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.junit.Test; + +import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteInterpreterEventPollerTest { + + @Test + public void shouldClearUnreadEventsOnShutdown() throws Exception { + RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess(); + RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null); + + eventPoller.setInterpreterProcess(interpreterProc); + eventPoller.shutdown(); + eventPoller.start(); + eventPoller.join(); + + assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType()); + } + + private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception { + RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent(); + RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, ""); + RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class); + RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class); + + when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents); + when(intProc.getClient()).thenReturn(client); + + return intProc; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java new file mode 100644 index 0000000..1687060 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + + +/** + * Test for remote interpreter output stream + */ +public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener { + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + private InterpreterSetting interpreterSetting; + + @Before + public void setUp() throws Exception { + InterpreterOption interpreterOption = new InterpreterOption(); + + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterOutputStream.class.getName(), "mock", true, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT); + interpreterSetting = new InterpreterSetting.Builder() + .setId("test") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .setRunner(runner) + .setInterpreterDir("../interpeters/test") + .create(); + } + + @After + public void tearDown() throws Exception { + interpreterSetting.close(); + } + + private InterpreterContext createInterpreterContext() { + return new InterpreterContext( + "noteId", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + null, + null, + new LinkedList<InterpreterContextRunner>(), null); + } + + @Test + public void testInterpreterResultOnly() { + RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); + InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("staticresult", ret.message().get(0).getData()); + + ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("staticresult2", ret.message().get(0).getData()); + + ret = intp.interpret("ERROR::staticresult3", createInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, ret.code()); + assertEquals("staticresult3", ret.message().get(0).getData()); + } + + @Test + public void testInterpreterOutputStreamOnly() { + RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); + InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("streamresult", ret.message().get(0).getData()); + + ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, ret.code()); + assertEquals("streamresult2", ret.message().get(0).getData()); + } + + @Test + public void testInterpreterResultOutputStreamMixed() { + RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); + InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("stream", ret.message().get(0).getData()); + assertEquals("static", ret.message().get(1).getData()); + } + + @Test + public void testOutputType() { + RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); + + InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); + assertEquals("hello", ret.message().get(0).getData()); + + ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); + assertEquals("hello", ret.message().get(0).getData()); + + ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); + assertEquals("hello", ret.message().get(0).getData()); + assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType()); + assertEquals("world", ret.message().get(1).getData()); + } + + @Override + public void onOutputAppend(String noteId, String paragraphId, int index, String output) { + + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { + + } + + @Override + public void onOutputClear(String noteId, String paragraphId) { + + } + + @Override + public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { + + } + + @Override + public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { + if (callback != null) { + callback.onFinished(new LinkedList<>()); + } + } + + @Override + public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { + + } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos) { + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java new file mode 100644 index 0000000..ae98dc3 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -0,0 +1,520 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.remote.mock.GetEnvPropertyInterpreter; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class RemoteInterpreterTest { + + + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + private InterpreterSetting interpreterSetting; + + @Before + public void setUp() throws Exception { + InterpreterOption interpreterOption = new InterpreterOption(); + + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo3 = new InterpreterInfo(SleepInterpreter.class.getName(), "sleep", false, new HashMap<String, Object>()); + InterpreterInfo interpreterInfo4 = new InterpreterInfo(GetEnvPropertyInterpreter.class.getName(), "get", false, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + interpreterInfos.add(interpreterInfo2); + interpreterInfos.add(interpreterInfo3); + interpreterInfos.add(interpreterInfo4); + InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT); + interpreterSetting = new InterpreterSetting.Builder() + .setId("test") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .setRunner(runner) + .setInterpreterDir("../interpeters/test") + .create(); + } + + @After + public void tearDown() throws Exception { + interpreterSetting.close(); + } + + @Test + public void testSharedMode() { + interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); + + Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); + Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertTrue(interpreter1 instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; + assertTrue(interpreter2 instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2; + + InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); + assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType()); + assertEquals(0, remoteInterpreter1.getProgress(context1)); + assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess()); + assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); + + assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); + assertEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(), + remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); + + // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the + // RemoteInterpreterProcess leakage. + remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId()); + assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess()); + try { + assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); + fail("Should not be able to call interpret after interpreter is closed"); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); + fail("Should not be able to call getProgress after RemoterInterpreterProcess is stoped"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testScopedMode() { + interpreterSetting.getOption().setPerUser(InterpreterOption.SCOPED); + + Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); + Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertTrue(interpreter1 instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; + assertTrue(interpreter2 instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2; + + InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); + assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); + assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType()); + assertEquals(0, remoteInterpreter1.getProgress(context1)); + + assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess()); + assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); + + assertEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(), + remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); + // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the + // RemoteInterpreterProcess leakage. + remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId()); + try { + assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); + fail("Should not be able to call interpret after interpreter is closed"); + } catch (Exception e) { + e.printStackTrace(); + } + + assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); + assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); + remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId()); + try { + assertEquals("hello", remoteInterpreter2.interpret("hello", context1)); + fail("Should not be able to call interpret after interpreter is closed"); + } catch (Exception e) { + e.printStackTrace(); + } + assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); + } + + @Test + public void testIsolatedMode() { + interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED); + + Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); + Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1"); + assertTrue(interpreter1 instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; + assertTrue(interpreter2 instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2; + + InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); + assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); + assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType()); + assertEquals(0, remoteInterpreter1.getProgress(context1)); + assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess()); + assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); + + assertNotEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(), + remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); + // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the + // RemoteInterpreterProcess leakage. + remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId()); + assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess()); + assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); + try { + remoteInterpreter1.interpret("hello", context1); + fail("Should not be able to call getProgress after interpreter is closed"); + } catch (Exception e) { + e.printStackTrace(); + } + + assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); + remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId()); + try { + assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); + fail("Should not be able to call interpret after interpreter is closed"); + } catch (Exception e) { + e.printStackTrace(); + } + assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); + + } + +// @Test +// public void testExecuteIncorrectPrecode() throws TTransportException, IOException { +// interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); +// interpreterSetting.getProperties().setProperty("zeppelin.SleepInterpreter.precode", "fail test"); +// +// Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep"); +// InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", +// "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), +// null, null, new ArrayList<InterpreterContextRunner>(), null); +// assertEquals(Code.ERROR, interpreter1.interpret("10", context1).code()); +// } +// +// @Test +// public void testExecuteCorrectPrecode() throws TTransportException, IOException { +// interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); +// interpreterSetting.getProperties().setProperty("zeppelin.SleepInterpreter.precode", "1"); +// +// Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep"); +// InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", +// "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), +// null, null, new ArrayList<InterpreterContextRunner>(), null); +// assertEquals(Code.SUCCESS, interpreter1.interpret("10", context1).code()); +// } + + @Test + public void testRemoteInterperterErrorStatus() throws TTransportException, IOException { + interpreterSetting.setProperty("zeppelin.interpreter.echo.fail", "true"); + interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); + + Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); + assertTrue(interpreter1 instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; + + InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + assertEquals(Code.ERROR, remoteInterpreter1.interpret("hello", context1).code()); + } + + @Test + public void testFIFOScheduler() throws InterruptedException { + interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); + // by default SleepInterpreter would use FIFOScheduler + + final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep"); + final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the + // time overhead of launching the process. + interpreter1.interpret("1", context1); + Thread thread1 = new Thread() { + @Override + public void run() { + assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code()); + } + }; + Thread thread2 = new Thread() { + @Override + public void run() { + assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code()); + } + }; + long start = System.currentTimeMillis(); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + long end = System.currentTimeMillis(); + assertTrue((end - start) >= 200); + } + + @Test + public void testParallelScheduler() throws InterruptedException { + interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); + interpreterSetting.setProperty("zeppelin.SleepInterpreter.parallel", "true"); + + final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep"); + final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + + // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the + // time overhead of launching the process. + interpreter1.interpret("1", context1); + Thread thread1 = new Thread() { + @Override + public void run() { + assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code()); + } + }; + Thread thread2 = new Thread() { + @Override + public void run() { + assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code()); + } + }; + long start = System.currentTimeMillis(); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + long end = System.currentTimeMillis(); + assertTrue((end - start) <= 200); + } + +// @Test +// public void testRunOrderPreserved() throws InterruptedException { +// Properties p = new Properties(); +// intpGroup.put("note", new LinkedList<Interpreter>()); +// +// final RemoteInterpreter intpA = createMockInterpreterA(p); +// +// intpGroup.get("note").add(intpA); +// intpA.setInterpreterGroup(intpGroup); +// +// intpA.open(); +// +// int concurrency = 3; +// final List<InterpreterResultMessage> results = new LinkedList<>(); +// +// Scheduler scheduler = intpA.getScheduler(); +// for (int i = 0; i < concurrency; i++) { +// final String jobId = Integer.toString(i); +// scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { +// private Object r; +// +// @Override +// public Object getReturn() { +// return r; +// } +// +// @Override +// public void setResult(Object results) { +// this.r = results; +// } +// +// @Override +// public int progress() { +// return 0; +// } +// +// @Override +// public Map<String, Object> info() { +// return null; +// } +// +// @Override +// protected Object jobRun() throws Throwable { +// InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext( +// "note", +// jobId, +// null, +// "title", +// "text", +// new AuthenticationInfo(), +// new HashMap<String, Object>(), +// new GUI(), +// new AngularObjectRegistry(intpGroup.getId(), null), +// new LocalResourcePool("pool1"), +// new LinkedList<InterpreterContextRunner>(), null)); +// +// synchronized (results) { +// results.addAll(ret.message()); +// results.notify(); +// } +// return null; +// } +// +// @Override +// protected boolean jobAbort() { +// return false; +// } +// +// }); +// } +// +// // wait for job finished +// synchronized (results) { +// while (results.size() != concurrency) { +// results.wait(300); +// } +// } +// +// int i = 0; +// for (InterpreterResultMessage result : results) { +// assertEquals(Integer.toString(i++), result.getData()); +// } +// assertEquals(concurrency, i); +// +// intpA.close(); +// } + + +// @Test +// public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { +// Properties p = new Properties(); +// intpGroup.put("note", new LinkedList<Interpreter>()); +// +// RemoteInterpreter intpA = createMockInterpreterA(p); +// +// intpGroup.get("note").add(intpA); +// intpA.setInterpreterGroup(intpGroup); +// +// RemoteInterpreter intpB = createMockInterpreterB(p); +// +// intpGroup.get("note").add(intpB); +// intpB.setInterpreterGroup(intpGroup); +// +// intpA.open(); +// intpB.open(); +// +// assertEquals(intpA.getScheduler(), intpB.getScheduler()); +// } + +// @Test +// public void testMultiInterpreterSession() { +// Properties p = new Properties(); +// intpGroup.put("sessionA", new LinkedList<Interpreter>()); +// intpGroup.put("sessionB", new LinkedList<Interpreter>()); +// +// RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA"); +// intpGroup.get("sessionA").add(intpAsessionA); +// intpAsessionA.setInterpreterGroup(intpGroup); +// +// RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA"); +// intpGroup.get("sessionA").add(intpBsessionA); +// intpBsessionA.setInterpreterGroup(intpGroup); +// +// intpAsessionA.open(); +// intpBsessionA.open(); +// +// assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler()); +// +// RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB"); +// intpGroup.get("sessionB").add(intpAsessionB); +// intpAsessionB.setInterpreterGroup(intpGroup); +// +// RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB"); +// intpGroup.get("sessionB").add(intpBsessionB); +// intpBsessionB.setInterpreterGroup(intpGroup); +// +// intpAsessionB.open(); +// intpBsessionB.open(); +// +// assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler()); +// assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler()); +// } + +// @Test +// public void should_push_local_angular_repo_to_remote() throws Exception { +// //Given +// final Client client = mock(Client.class); +// final RemoteInterpreter intr = null; +//// new RemoteInterpreter(new Properties(), "noteId", +//// MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null, +//// null, "anonymous", false); +// final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null); +// registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId"); +// final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId"); +// interpreterGroup.setAngularObjectRegistry(registry); +// intr.setInterpreterGroup(interpreterGroup); +// +// final java.lang.reflect.Type registryType = new TypeToken<Map<String, +// Map<String, AngularObject>>>() { +// }.getType(); +// final Gson gson = new Gson(); +// final String expected = gson.toJson(registry.getRegistry(), registryType); +// +// //When +//// intr.pushAngularObjectRegistryToRemote(client); +// +// //Then +// Mockito.verify(client).angularRegistryPush(expected); +// } + + @Test + public void testEnvStringPattern() { + assertFalse(RemoteInterpreterUtils.isEnvString(null)); + assertFalse(RemoteInterpreterUtils.isEnvString("")); + assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF")); + assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123")); + } + + @Test + public void testEnvironmentAndProperty() { + interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); + interpreterSetting.setProperty("ENV_1", "VALUE_1"); + interpreterSetting.setProperty("property_1", "value_1"); + + final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "get"); + final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + + assertEquals("VALUE_1", interpreter1.interpret("getEnv ENV_1", context1).message().get(0).getData()); + assertEquals("null", interpreter1.interpret("getEnv ENV_2", context1).message().get(0).getData()); + + assertEquals("value_1", interpreter1.interpret("getProperty property_1", context1).message().get(0).getData()); + assertEquals("null", interpreter1.interpret("getProperty property_2", context1).message().get(0).getData()); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java new file mode 100644 index 0000000..5f7426a --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertTrue; + +public class RemoteInterpreterUtilsTest { + + @Test + public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { + assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java new file mode 100644 index 0000000..a039a59 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.util.List; +import java.util.Properties; + + +public class GetEnvPropertyInterpreter extends Interpreter { + + public GetEnvPropertyInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] cmd = st.split(" "); + if (cmd[0].equals("getEnv")) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1]) == null ? "null" : System.getenv(cmd[1])); + } else if (cmd[0].equals("getProperty")){ + return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1]) == null ? "null" : System.getProperty(cmd[1])); + } else { + return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]); + } + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } +} + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java new file mode 100644 index 0000000..5a3e57c --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.util.List; +import java.util.Properties; + +public class MockInterpreterA extends Interpreter { + + private String lastSt; + + public MockInterpreterA(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + public String getLastStatement() { + return lastSt; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + if (property.containsKey("progress")) { + context.setProgress(Integer.parseInt(getProperty("progress"))); + } + try { + Thread.sleep(Long.parseLong(st)); + this.lastSt = st; + } catch (NumberFormatException | InterruptedException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(Code.SUCCESS, st); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + if (getProperty("parallel") != null && getProperty("parallel").equals("true")) { + return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10); + } else { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java new file mode 100644 index 0000000..ec89241 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectWatcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +public class MockInterpreterAngular extends Interpreter { + + AtomicInteger numWatch = new AtomicInteger(0); + + public MockInterpreterAngular(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] stmt = st.split(" "); + String cmd = stmt[0]; + String name = null; + if (stmt.length >= 2) { + name = stmt[1]; + } + String value = null; + if (stmt.length == 3) { + value = stmt[2]; + } + + AngularObjectRegistry registry = context.getAngularObjectRegistry(); + + if (cmd.equals("add")) { + registry.add(name, value, context.getNoteId(), null); + registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher + (null) { + + @Override + public void watch(Object oldObject, Object newObject, + InterpreterContext context) { + numWatch.incrementAndGet(); + } + + }); + } else if (cmd.equalsIgnoreCase("update")) { + registry.get(name, context.getNoteId(), null).set(value); + } else if (cmd.equals("remove")) { + registry.remove(name, context.getNoteId(), null); + } + + try { + Thread.sleep(500); // wait for watcher executed + } catch (InterruptedException e) { + logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e); + } + + String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch + .get()); + return new InterpreterResult(Code.SUCCESS, msg); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java new file mode 100644 index 0000000..ff3ff9f --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; + +import java.util.List; +import java.util.Properties; + +public class MockInterpreterB extends Interpreter { + + public MockInterpreterB(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + MockInterpreterA intpA = getInterpreterA(); + String intpASt = intpA.getLastStatement(); + long timeToSleep = Long.parseLong(st); + if (intpASt != null) { + timeToSleep += Long.parseLong(intpASt); + } + try { + Thread.sleep(timeToSleep); + } catch (NumberFormatException | InterruptedException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep)); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + public MockInterpreterA getInterpreterA() { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + synchronized (interpreterGroup) { + for (List<Interpreter> interpreters : interpreterGroup.values()) { + boolean belongsToSameNoteGroup = false; + MockInterpreterA a = null; + for (Interpreter intp : interpreters) { + if (intp.getClassName().equals(MockInterpreterA.class.getName())) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + a = (MockInterpreterA) p; + } + + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + if (this == p) { + belongsToSameNoteGroup = true; + } + } + if (belongsToSameNoteGroup) { + return a; + } + } + } + return null; + } + + @Override + public Scheduler getScheduler() { + MockInterpreterA intpA = getInterpreterA(); + if (intpA != null) { + return intpA.getScheduler(); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java new file mode 100644 index 0000000..1890cbc --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * MockInterpreter to test outputstream + */ +public class MockInterpreterOutputStream extends Interpreter { + private String lastSt; + + public MockInterpreterOutputStream(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + public String getLastStatement() { + return lastSt; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] ret = st.split(":"); + try { + if (ret[1] != null) { + context.out.write(ret[1]); + } + } catch (IOException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ? + ret[2] : ""); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java new file mode 100644 index 0000000..ee9f15c --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import com.google.gson.Gson; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePool; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +public class MockInterpreterResourcePool extends Interpreter { + + AtomicInteger numWatch = new AtomicInteger(0); + + public MockInterpreterResourcePool(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] stmt = st.split(" "); + String cmd = stmt[0]; + String noteId = null; + String paragraphId = null; + String name = null; + if (stmt.length >= 2) { + String[] npn = stmt[1].split(":"); + if (npn.length >= 3) { + noteId = npn[0]; + paragraphId = npn[1]; + name = npn[2]; + } else { + name = stmt[1]; + } + } + String value = null; + if (stmt.length >= 3) { + value = stmt[2]; + } + + ResourcePool resourcePool = context.getResourcePool(); + Object ret = null; + if (cmd.equals("put")) { + resourcePool.put(noteId, paragraphId, name, value); + } else if (cmd.equalsIgnoreCase("get")) { + Resource resource = resourcePool.get(noteId, paragraphId, name); + if (resource != null) { + ret = resourcePool.get(noteId, paragraphId, name).get(); + } else { + ret = ""; + } + } else if (cmd.equals("remove")) { + ret = resourcePool.remove(noteId, paragraphId, name); + } else if (cmd.equals("getAll")) { + ret = resourcePool.getAll(); + } else if (cmd.equals("invoke")) { + Resource resource = resourcePool.get(noteId, paragraphId, name); + if (stmt.length >=4) { + Resource res = resource.invokeMethod(value, null, null, stmt[3]); + ret = res.get(); + } else { + ret = resource.invokeMethod(value, null, null); + } + } + + try { + Thread.sleep(500); // wait for watcher executed + } catch (InterruptedException e) { + } + + Gson gson = new Gson(); + return new InterpreterResult(Code.SUCCESS, gson.toJson(ret)); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java new file mode 100644 index 0000000..a1afe0e --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterInfo; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterRunner; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.scheduler.Job.Status; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { + + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + private InterpreterSetting interpreterSetting; + private SchedulerFactory schedulerSvc; + private static final int TICK_WAIT = 100; + private static final int MAX_WAIT_CYCLES = 100; + + @Before + public void setUp() throws Exception { + schedulerSvc = new SchedulerFactory(); + + InterpreterOption interpreterOption = new InterpreterOption(); + interpreterOption.setRemote(true); + InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterA.class.getName(), "mock", true, new HashMap<String, Object>()); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(interpreterInfo1); + InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT); + interpreterSetting = new InterpreterSetting.Builder() + .setId("test") + .setName("test") + .setGroup("test") + .setInterpreterInfos(interpreterInfos) + .setOption(interpreterOption) + .setRunner(runner) + .setInterpreterDir("../interpeters/test") + .create(); + } + + @After + public void tearDown() { + interpreterSetting.close(); + } + + @Test + public void test() throws Exception { + final RemoteInterpreter intpA = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); + + intpA.open(); + + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", + intpA, + 10); + + Job job = new Job("jobId", "jobName", null, 200) { + Object results; + + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", new InterpreterContext( + "note", + "jobId", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + null, + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + return "1000"; + } + + @Override + protected boolean jobAbort() { + return false; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + scheduler.submit(job); + + int cycles = 0; + while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + assertTrue(job.isRunning()); + + Thread.sleep(5 * TICK_WAIT); + assertEquals(0, scheduler.getJobsWaiting().size()); + assertEquals(1, scheduler.getJobsRunning().size()); + + cycles = 0; + while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + + assertTrue(job.isTerminated()); + assertEquals(0, scheduler.getJobsWaiting().size()); + assertEquals(0, scheduler.getJobsRunning().size()); + + intpA.close(); + schedulerSvc.removeScheduler("test"); + } + + @Test + public void testAbortOnPending() throws Exception { + final RemoteInterpreter intpA = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); + intpA.open(); + + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", intpA, 10); + + Job job1 = new Job("jobId1", "jobName1", null, 200) { + Object results; + InterpreterContext context = new InterpreterContext( + "note", + "jobId1", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + null, + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", context); + return "1000"; + } + + @Override + protected boolean jobAbort() { + if (isRunning()) { + intpA.cancel(context); + } + return true; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + + Job job2 = new Job("jobId2", "jobName2", null, 200) { + public Object results; + InterpreterContext context = new InterpreterContext( + "note", + "jobId2", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + null, + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", context); + return "1000"; + } + + @Override + protected boolean jobAbort() { + if (isRunning()) { + intpA.cancel(context); + } + return true; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + + job2.setResult("result2"); + + scheduler.submit(job1); + scheduler.submit(job2); + + + int cycles = 0; + while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + assertTrue(job1.isRunning()); + assertTrue(job2.getStatus() == Status.PENDING); + + job2.abort(); + + cycles = 0; + while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + + assertNotNull(job1.getDateFinished()); + assertTrue(job1.isTerminated()); + assertNull(job2.getDateFinished()); + assertTrue(job2.isTerminated()); + assertEquals("result2", job2.getReturn()); + + intpA.close(); + schedulerSvc.removeScheduler("test"); + } + + @Override + public void onOutputAppend(String noteId, String paragraphId, int index, String output) { + + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { + + } + + @Override + public void onOutputClear(String noteId, String paragraphId) { + + } + + @Override + public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { + + } + + @Override + public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { + if (callback != null) { + callback.onFinished(new LinkedList<>()); + } + } + + @Override + public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { + } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos) { + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/resources/conf/interpreter.json ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/conf/interpreter.json b/zeppelin-interpreter/src/test/resources/conf/interpreter.json new file mode 100644 index 0000000..45e1d60 --- /dev/null +++ b/zeppelin-interpreter/src/test/resources/conf/interpreter.json @@ -0,0 +1,115 @@ +{ + "interpreterSettings": { + "2C3RWCVAG": { + "id": "2C3RWCVAG", + "name": "test", + "group": "test", + "properties": { + "property_1": "value_1", + "property_2": "new_value_2", + "property_3": "value_3" + }, + "status": "READY", + "interpreterGroup": [ + { + "name": "echo", + "class": "org.apache.zeppelin.interpreter.EchoInterpreter", + "defaultInterpreter": true, + "editor": { + "language": "java", + "editOnDblClick": false + } + } + ], + "dependencies": [], + "option": { + "remote": true, + "port": -1, + "perNote": "shared", + "perUser": "shared", + "isExistingProcess": false, + "setPermission": false, + "users": [], + "isUserImpersonate": false + } + }, + + "2CKWE7B19": { + "id": "2CKWE7B19", + "name": "test2", + "group": "test", + "properties": { + "property_1": "value_1", + "property_2": "new_value_2", + "property_3": "value_3" + }, + "status": "READY", + "interpreterGroup": [ + { + "name": "echo", + "class": "org.apache.zeppelin.interpreter.EchoInterpreter", + "defaultInterpreter": true, + "editor": { + "language": "java", + "editOnDblClick": false + } + } + ], + "dependencies": [], + "option": { + "remote": true, + "port": -1, + "perNote": "shared", + "perUser": "shared", + "isExistingProcess": false, + "setPermission": false, + "users": [], + "isUserImpersonate": false + } + } + }, + "interpreterBindings": { + "2C6793KRV": [ + "2C48Y7FSJ", + "2C63XW4XE", + "2C66GE1VB", + "2C5VH924X", + "2C4BJDRRZ", + "2C3SQSB7V", + "2C4HKDCQW", + "2C3DR183X", + "2C66Z9XPQ", + "2C3PTPMUH", + "2C69WE69N", + "2C5SRRXHM", + "2C4ZD49PF", + "2C6V3D44K", + "2C4UB1UZA", + "2C5S1R21W", + "2C5DCRVGM", + "2C686X8ZH", + "2C3RWCVAG", + "2C3JKFMJU", + "2C3VECEG2" + ] + }, + "interpreterRepositories": [ + { + "id": "central", + "type": "default", + "url": "http://repo1.maven.org/maven2/", + "releasePolicy": { + "enabled": true, + "updatePolicy": "daily", + "checksumPolicy": "warn" + }, + "snapshotPolicy": { + "enabled": true, + "updatePolicy": "daily", + "checksumPolicy": "warn" + }, + "mirroredRepositories": [], + "repositoryManager": false + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json new file mode 100644 index 0000000..1ba1b94 --- /dev/null +++ b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json @@ -0,0 +1,42 @@ +[ + { + "group": "test", + "name": "double_echo", + "className": "org.apache.zeppelin.interpreter.DoubleEchoInterpreter", + "properties": { + "property_1": { + "envName": "PROPERTY_1", + "propertyName": "property_1", + "defaultValue": "value_1", + "description": "desc_1" + }, + "property_2": { + "envName": "PROPERTY_2", + "propertyName": "property_2", + "defaultValue": "value_2", + "description": "desc_2" + } + } + }, + + { + "group": "test", + "name": "echo", + "defaultInterpreter": true, + "className": "org.apache.zeppelin.interpreter.EchoInterpreter", + "properties": { + "property_1": { + "envName": "PROPERTY_1", + "propertyName": "property_1", + "defaultValue": "value_1", + "description": "desc_1" + }, + "property_2": { + "envName": "PROPERTY_2", + "propertyName": "property_2", + "defaultValue": "value_2", + "description": "desc_2" + } + } + } +] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-interpreter/src/test/resources/log4j.properties index d8a7839..6f34691 100644 --- a/zeppelin-interpreter/src/test/resources/log4j.properties +++ b/zeppelin-interpreter/src/test/resources/log4j.properties @@ -26,4 +26,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n # # Root logger option -log4j.rootLogger=INFO, stdout \ No newline at end of file +log4j.rootLogger=INFO, stdout +log4j.logger.org.apache.zeppelin.interpreter=DEBUG +log4j.logger.org.apache.zeppelin.scheduler=DEBUG \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index cd0210e..c1dba5c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -185,7 +185,7 @@ public class InterpreterRestApi { String noteId = request == null ? null : request.getNoteId(); if (null == noteId) { - interpreterSettingManager.close(setting); + interpreterSettingManager.close(settingId); } else { interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal()); } @@ -208,7 +208,7 @@ public class InterpreterRestApi { @GET @ZeppelinApi public Response listInterpreter(String message) { - Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings(); + Map<String, InterpreterSetting> m = interpreterSettingManager.getInterpreterSettingTemplates(); return new JsonResponse<>(Status.OK, "", m).build(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 7453470..53ee114 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -93,13 +93,11 @@ public class ZeppelinServer extends Application { private NotebookRepoSync notebookRepo; private NotebookAuthorization notebookAuthorization; private Credentials credentials; - private DependencyResolver depResolver; public ZeppelinServer() throws Exception { ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - this.depResolver = new DependencyResolver( - conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO)); + InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT); @@ -129,13 +127,26 @@ public class ZeppelinServer extends Application { new File(conf.getRelativeDir("zeppelin-web/src/app/spell"))); } + this.schedulerFactory = SchedulerFactory.singleton(); + this.interpreterSettingManager = new InterpreterSettingManager(conf, notebookWsServer, + notebookWsServer, notebookWsServer); + this.replFactory = new InterpreterFactory(interpreterSettingManager); + this.notebookRepo = new NotebookRepoSync(conf); + this.noteSearchService = new LuceneSearch(); + this.notebookAuthorization = NotebookAuthorization.init(conf); + this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); + notebook = new Notebook(conf, + notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer, + noteSearchService, notebookAuthorization, credentials); + ZeppelinServer.helium = new Helium( conf.getHeliumConfPath(), conf.getHeliumRegistry(), new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO), "helium-registry-cache"), heliumBundleFactory, - heliumApplicationFactory); + heliumApplicationFactory, + interpreterSettingManager); // create bundle try { @@ -144,20 +155,6 @@ public class ZeppelinServer extends Application { LOG.error(e.getMessage(), e); } - this.schedulerFactory = new SchedulerFactory(); - this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, - new InterpreterOption(true)); - this.replFactory = new InterpreterFactory(conf, notebookWsServer, - notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(), - interpreterSettingManager); - this.notebookRepo = new NotebookRepoSync(conf); - this.noteSearchService = new LuceneSearch(); - this.notebookAuthorization = NotebookAuthorization.init(conf); - this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); - notebook = new Notebook(conf, - notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer, - noteSearchService, notebookAuthorization, credentials); - // to update notebook from application event from remote process. heliumApplicationFactory.setNotebook(notebook); // to update fire websocket event on application event. @@ -206,7 +203,7 @@ public class ZeppelinServer extends Application { LOG.info("Shutting down Zeppelin Server ... "); try { jettyWebServer.stop(); - notebook.getInterpreterSettingManager().shutdown(); + notebook.getInterpreterSettingManager().close(); notebook.close(); Thread.sleep(3000); } catch (Exception e) {