http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java deleted file mode 100644 index 61e4ef0..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.zeppelin.display.AngularObject; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; - -public class RemoteAngularObjectTest implements AngularObjectRegistryListener { - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - - private RemoteInterpreter intp; - private InterpreterContext context; - private RemoteAngularObjectRegistry localRegistry; - private InterpreterSetting interpreterSetting; - - private AtomicInteger onAdd; - private AtomicInteger onUpdate; - private AtomicInteger onRemove; - - @Before - public void setUp() throws Exception { - onAdd = new AtomicInteger(0); - onUpdate = new AtomicInteger(0); - onRemove = new AtomicInteger(0); - - InterpreterOption interpreterOption = new InterpreterOption(); - interpreterOption.setRemote(true); - InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterAngular.class.getName(), "mock", true, new HashMap<String, Object>()); - List<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(interpreterInfo1); - InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT); - interpreterSetting = new InterpreterSetting.Builder() - .setId("test") - .setName("test") - .setGroup("test") - .setInterpreterInfos(interpreterInfos) - .setOption(interpreterOption) - .setRunner(runner) - .setInterpreterDir("../interpeters/test") - .create(); - - intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); - localRegistry = (RemoteAngularObjectRegistry) intp.getInterpreterGroup().getAngularObjectRegistry(); - - context = new InterpreterContext( - "note", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intp.getInterpreterGroup().getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null); - - intp.open(); - - } - - @After - public void tearDown() throws Exception { - interpreterSetting.close(); - } - - @Test - public void testAngularObjectInterpreterSideCRUD() throws InterruptedException { - InterpreterResult ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - String[] result = ret.message().get(0).getData().split(" "); - assertEquals("0", result[0]); // size of registry - assertEquals("0", result[1]); // num watcher called - - // create object - ret = intp.interpret("add n1 v1", context); - Thread.sleep(500); - result = ret.message().get(0).getData().split(" "); - assertEquals("1", result[0]); // size of registry - assertEquals("0", result[1]); // num watcher called - assertEquals("v1", localRegistry.get("n1", "note", null).get()); - - // update object - ret = intp.interpret("update n1 v11", context); - result = ret.message().get(0).getData().split(" "); - Thread.sleep(500); - assertEquals("1", result[0]); // size of registry - assertEquals("1", result[1]); // num watcher called - assertEquals("v11", localRegistry.get("n1", "note", null).get()); - - // remove object - ret = intp.interpret("remove n1", context); - result = ret.message().get(0).getData().split(" "); - Thread.sleep(500); - assertEquals("0", result[0]); // size of registry - assertEquals("1", result[1]); // num watcher called - assertEquals(null, localRegistry.get("n1", "note", null)); - } - - @Test - public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException { - // test if angularobject removal from server side propagate to interpreter process's registry. - // will happen when notebook is removed. - - InterpreterResult ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - String[] result = ret.message().get(0).getData().split(" "); - assertEquals("0", result[0]); // size of registry - - // create object - ret = intp.interpret("add n1 v1", context); - Thread.sleep(500); - result = ret.message().get(0).getData().split(" "); - assertEquals("1", result[0]); // size of registry - assertEquals("v1", localRegistry.get("n1", "note", null).get()); - - // remove object in local registry. - localRegistry.removeAndNotifyRemoteProcess("n1", "note", null); - ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - result = ret.message().get(0).getData().split(" "); - assertEquals("0", result[0]); // size of registry - } - - @Test - public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException { - // test if angularobject add from server side propagate to interpreter process's registry. - // will happen when zeppelin server loads notebook and restore the object into registry - - InterpreterResult ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - String[] result = ret.message().get(0).getData().split(" "); - assertEquals("0", result[0]); // size of registry - - // create object - localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null); - - // get from remote registry - ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - result = ret.message().get(0).getData().split(" "); - assertEquals("1", result[0]); // size of registry - } - - @Override - public void onAdd(String interpreterGroupId, AngularObject object) { - onAdd.incrementAndGet(); - } - - @Override - public void onUpdate(String interpreterGroupId, AngularObject object) { - onUpdate.incrementAndGet(); - } - - @Override - public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { - onRemove.incrementAndGet(); - } - -}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java deleted file mode 100644 index 49aa7aa..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; -import org.junit.Test; - -import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class RemoteInterpreterEventPollerTest { - - @Test - public void shouldClearUnreadEventsOnShutdown() throws Exception { - RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess(); - RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null); - - eventPoller.setInterpreterProcess(interpreterProc); - eventPoller.shutdown(); - eventPoller.start(); - eventPoller.join(); - - assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType()); - } - - private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception { - RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent(); - RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, ""); - RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class); - RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class); - - when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents); - when(intProc.getClient()).thenReturn(client); - - return intProc; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java deleted file mode 100644 index 1687060..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.*; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; - - -/** - * Test for remote interpreter output stream - */ -public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener { - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - - private InterpreterSetting interpreterSetting; - - @Before - public void setUp() throws Exception { - InterpreterOption interpreterOption = new InterpreterOption(); - - interpreterOption.setRemote(true); - InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterOutputStream.class.getName(), "mock", true, new HashMap<String, Object>()); - List<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(interpreterInfo1); - InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT); - interpreterSetting = new InterpreterSetting.Builder() - .setId("test") - .setName("test") - .setGroup("test") - .setInterpreterInfos(interpreterInfos) - .setOption(interpreterOption) - .setRunner(runner) - .setInterpreterDir("../interpeters/test") - .create(); - } - - @After - public void tearDown() throws Exception { - interpreterSetting.close(); - } - - private InterpreterContext createInterpreterContext() { - return new InterpreterContext( - "noteId", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - null, - null, - new LinkedList<InterpreterContextRunner>(), null); - } - - @Test - public void testInterpreterResultOnly() { - RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); - InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals("staticresult", ret.message().get(0).getData()); - - ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals("staticresult2", ret.message().get(0).getData()); - - ret = intp.interpret("ERROR::staticresult3", createInterpreterContext()); - assertEquals(InterpreterResult.Code.ERROR, ret.code()); - assertEquals("staticresult3", ret.message().get(0).getData()); - } - - @Test - public void testInterpreterOutputStreamOnly() { - RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); - InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals("streamresult", ret.message().get(0).getData()); - - ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext()); - assertEquals(InterpreterResult.Code.ERROR, ret.code()); - assertEquals("streamresult2", ret.message().get(0).getData()); - } - - @Test - public void testInterpreterResultOutputStreamMixed() { - RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); - InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals("stream", ret.message().get(0).getData()); - assertEquals("static", ret.message().get(1).getData()); - } - - @Test - public void testOutputType() { - RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); - - InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext()); - assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); - assertEquals("hello", ret.message().get(0).getData()); - - ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext()); - assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); - assertEquals("hello", ret.message().get(0).getData()); - - ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext()); - assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); - assertEquals("hello", ret.message().get(0).getData()); - assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType()); - assertEquals("world", ret.message().get(1).getData()); - } - - @Override - public void onOutputAppend(String noteId, String paragraphId, int index, String output) { - - } - - @Override - public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { - - } - - @Override - public void onOutputClear(String noteId, String paragraphId) { - - } - - @Override - public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { - - } - - @Override - public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { - if (callback != null) { - callback.onFinished(new LinkedList<>()); - } - } - - @Override - public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { - - } - - @Override - public void onParaInfosReceived(String noteId, String paragraphId, - String interpreterSettingId, Map<String, String> metaInfos) { - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java deleted file mode 100644 index ae98dc3..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ /dev/null @@ -1,520 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.thrift.transport.TTransportException; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.mock.GetEnvPropertyInterpreter; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; - -public class RemoteInterpreterTest { - - - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - - private InterpreterSetting interpreterSetting; - - @Before - public void setUp() throws Exception { - InterpreterOption interpreterOption = new InterpreterOption(); - - interpreterOption.setRemote(true); - InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>()); - InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>()); - InterpreterInfo interpreterInfo3 = new InterpreterInfo(SleepInterpreter.class.getName(), "sleep", false, new HashMap<String, Object>()); - InterpreterInfo interpreterInfo4 = new InterpreterInfo(GetEnvPropertyInterpreter.class.getName(), "get", false, new HashMap<String, Object>()); - List<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(interpreterInfo1); - interpreterInfos.add(interpreterInfo2); - interpreterInfos.add(interpreterInfo3); - interpreterInfos.add(interpreterInfo4); - InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT); - interpreterSetting = new InterpreterSetting.Builder() - .setId("test") - .setName("test") - .setGroup("test") - .setInterpreterInfos(interpreterInfos) - .setOption(interpreterOption) - .setRunner(runner) - .setInterpreterDir("../interpeters/test") - .create(); - } - - @After - public void tearDown() throws Exception { - interpreterSetting.close(); - } - - @Test - public void testSharedMode() { - interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); - - Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); - Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1"); - assertTrue(interpreter1 instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; - assertTrue(interpreter2 instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2; - - InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", - "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), - null, null, new ArrayList<InterpreterContextRunner>(), null); - assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); - assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType()); - assertEquals(0, remoteInterpreter1.getProgress(context1)); - assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess()); - assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); - - assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); - assertEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(), - remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); - - // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the - // RemoteInterpreterProcess leakage. - remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId()); - assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess()); - try { - assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); - fail("Should not be able to call interpret after interpreter is closed"); - } catch (Exception e) { - e.printStackTrace(); - } - - try { - assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); - fail("Should not be able to call getProgress after RemoterInterpreterProcess is stoped"); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void testScopedMode() { - interpreterSetting.getOption().setPerUser(InterpreterOption.SCOPED); - - Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); - Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1"); - assertTrue(interpreter1 instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; - assertTrue(interpreter2 instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2; - - InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", - "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), - null, null, new ArrayList<InterpreterContextRunner>(), null); - assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); - assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); - assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType()); - assertEquals(0, remoteInterpreter1.getProgress(context1)); - - assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess()); - assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); - - assertEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(), - remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); - // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the - // RemoteInterpreterProcess leakage. - remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId()); - try { - assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); - fail("Should not be able to call interpret after interpreter is closed"); - } catch (Exception e) { - e.printStackTrace(); - } - - assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); - assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); - remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId()); - try { - assertEquals("hello", remoteInterpreter2.interpret("hello", context1)); - fail("Should not be able to call interpret after interpreter is closed"); - } catch (Exception e) { - e.printStackTrace(); - } - assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); - } - - @Test - public void testIsolatedMode() { - interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED); - - Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); - Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1"); - assertTrue(interpreter1 instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; - assertTrue(interpreter2 instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2; - - InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", - "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), - null, null, new ArrayList<InterpreterContextRunner>(), null); - assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData()); - assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); - assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType()); - assertEquals(0, remoteInterpreter1.getProgress(context1)); - assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess()); - assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); - - assertNotEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(), - remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); - // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the - // RemoteInterpreterProcess leakage. - remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId()); - assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess()); - assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning()); - try { - remoteInterpreter1.interpret("hello", context1); - fail("Should not be able to call getProgress after interpreter is closed"); - } catch (Exception e) { - e.printStackTrace(); - } - - assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); - remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId()); - try { - assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData()); - fail("Should not be able to call interpret after interpreter is closed"); - } catch (Exception e) { - e.printStackTrace(); - } - assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess()); - - } - -// @Test -// public void testExecuteIncorrectPrecode() throws TTransportException, IOException { -// interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); -// interpreterSetting.getProperties().setProperty("zeppelin.SleepInterpreter.precode", "fail test"); -// -// Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep"); -// InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", -// "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), -// null, null, new ArrayList<InterpreterContextRunner>(), null); -// assertEquals(Code.ERROR, interpreter1.interpret("10", context1).code()); -// } -// -// @Test -// public void testExecuteCorrectPrecode() throws TTransportException, IOException { -// interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); -// interpreterSetting.getProperties().setProperty("zeppelin.SleepInterpreter.precode", "1"); -// -// Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep"); -// InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", -// "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), -// null, null, new ArrayList<InterpreterContextRunner>(), null); -// assertEquals(Code.SUCCESS, interpreter1.interpret("10", context1).code()); -// } - - @Test - public void testRemoteInterperterErrorStatus() throws TTransportException, IOException { - interpreterSetting.setProperty("zeppelin.interpreter.echo.fail", "true"); - interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); - - Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); - assertTrue(interpreter1 instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; - - InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", - "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), - null, null, new ArrayList<InterpreterContextRunner>(), null); - assertEquals(Code.ERROR, remoteInterpreter1.interpret("hello", context1).code()); - } - - @Test - public void testFIFOScheduler() throws InterruptedException { - interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); - // by default SleepInterpreter would use FIFOScheduler - - final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep"); - final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", - "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), - null, null, new ArrayList<InterpreterContextRunner>(), null); - // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the - // time overhead of launching the process. - interpreter1.interpret("1", context1); - Thread thread1 = new Thread() { - @Override - public void run() { - assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code()); - } - }; - Thread thread2 = new Thread() { - @Override - public void run() { - assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code()); - } - }; - long start = System.currentTimeMillis(); - thread1.start(); - thread2.start(); - thread1.join(); - thread2.join(); - long end = System.currentTimeMillis(); - assertTrue((end - start) >= 200); - } - - @Test - public void testParallelScheduler() throws InterruptedException { - interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); - interpreterSetting.setProperty("zeppelin.SleepInterpreter.parallel", "true"); - - final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep"); - final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", - "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), - null, null, new ArrayList<InterpreterContextRunner>(), null); - - // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the - // time overhead of launching the process. - interpreter1.interpret("1", context1); - Thread thread1 = new Thread() { - @Override - public void run() { - assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code()); - } - }; - Thread thread2 = new Thread() { - @Override - public void run() { - assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code()); - } - }; - long start = System.currentTimeMillis(); - thread1.start(); - thread2.start(); - thread1.join(); - thread2.join(); - long end = System.currentTimeMillis(); - assertTrue((end - start) <= 200); - } - -// @Test -// public void testRunOrderPreserved() throws InterruptedException { -// Properties p = new Properties(); -// intpGroup.put("note", new LinkedList<Interpreter>()); -// -// final RemoteInterpreter intpA = createMockInterpreterA(p); -// -// intpGroup.get("note").add(intpA); -// intpA.setInterpreterGroup(intpGroup); -// -// intpA.open(); -// -// int concurrency = 3; -// final List<InterpreterResultMessage> results = new LinkedList<>(); -// -// Scheduler scheduler = intpA.getScheduler(); -// for (int i = 0; i < concurrency; i++) { -// final String jobId = Integer.toString(i); -// scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { -// private Object r; -// -// @Override -// public Object getReturn() { -// return r; -// } -// -// @Override -// public void setResult(Object results) { -// this.r = results; -// } -// -// @Override -// public int progress() { -// return 0; -// } -// -// @Override -// public Map<String, Object> info() { -// return null; -// } -// -// @Override -// protected Object jobRun() throws Throwable { -// InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext( -// "note", -// jobId, -// null, -// "title", -// "text", -// new AuthenticationInfo(), -// new HashMap<String, Object>(), -// new GUI(), -// new AngularObjectRegistry(intpGroup.getId(), null), -// new LocalResourcePool("pool1"), -// new LinkedList<InterpreterContextRunner>(), null)); -// -// synchronized (results) { -// results.addAll(ret.message()); -// results.notify(); -// } -// return null; -// } -// -// @Override -// protected boolean jobAbort() { -// return false; -// } -// -// }); -// } -// -// // wait for job finished -// synchronized (results) { -// while (results.size() != concurrency) { -// results.wait(300); -// } -// } -// -// int i = 0; -// for (InterpreterResultMessage result : results) { -// assertEquals(Integer.toString(i++), result.getData()); -// } -// assertEquals(concurrency, i); -// -// intpA.close(); -// } - - -// @Test -// public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { -// Properties p = new Properties(); -// intpGroup.put("note", new LinkedList<Interpreter>()); -// -// RemoteInterpreter intpA = createMockInterpreterA(p); -// -// intpGroup.get("note").add(intpA); -// intpA.setInterpreterGroup(intpGroup); -// -// RemoteInterpreter intpB = createMockInterpreterB(p); -// -// intpGroup.get("note").add(intpB); -// intpB.setInterpreterGroup(intpGroup); -// -// intpA.open(); -// intpB.open(); -// -// assertEquals(intpA.getScheduler(), intpB.getScheduler()); -// } - -// @Test -// public void testMultiInterpreterSession() { -// Properties p = new Properties(); -// intpGroup.put("sessionA", new LinkedList<Interpreter>()); -// intpGroup.put("sessionB", new LinkedList<Interpreter>()); -// -// RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA"); -// intpGroup.get("sessionA").add(intpAsessionA); -// intpAsessionA.setInterpreterGroup(intpGroup); -// -// RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA"); -// intpGroup.get("sessionA").add(intpBsessionA); -// intpBsessionA.setInterpreterGroup(intpGroup); -// -// intpAsessionA.open(); -// intpBsessionA.open(); -// -// assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler()); -// -// RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB"); -// intpGroup.get("sessionB").add(intpAsessionB); -// intpAsessionB.setInterpreterGroup(intpGroup); -// -// RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB"); -// intpGroup.get("sessionB").add(intpBsessionB); -// intpBsessionB.setInterpreterGroup(intpGroup); -// -// intpAsessionB.open(); -// intpBsessionB.open(); -// -// assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler()); -// assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler()); -// } - -// @Test -// public void should_push_local_angular_repo_to_remote() throws Exception { -// //Given -// final Client client = mock(Client.class); -// final RemoteInterpreter intr = null; -//// new RemoteInterpreter(new Properties(), "noteId", -//// MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null, -//// null, "anonymous", false); -// final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null); -// registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId"); -// final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId"); -// interpreterGroup.setAngularObjectRegistry(registry); -// intr.setInterpreterGroup(interpreterGroup); -// -// final java.lang.reflect.Type registryType = new TypeToken<Map<String, -// Map<String, AngularObject>>>() { -// }.getType(); -// final Gson gson = new Gson(); -// final String expected = gson.toJson(registry.getRegistry(), registryType); -// -// //When -//// intr.pushAngularObjectRegistryToRemote(client); -// -// //Then -// Mockito.verify(client).angularRegistryPush(expected); -// } - - @Test - public void testEnvStringPattern() { - assertFalse(RemoteInterpreterUtils.isEnvString(null)); - assertFalse(RemoteInterpreterUtils.isEnvString("")); - assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF")); - assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF")); - assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF")); - assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF")); - assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123")); - } - - @Test - public void testEnvironmentAndProperty() { - interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); - interpreterSetting.setProperty("ENV_1", "VALUE_1"); - interpreterSetting.setProperty("property_1", "value_1"); - - final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "get"); - final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", - "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), - null, null, new ArrayList<InterpreterContextRunner>(), null); - - assertEquals("VALUE_1", interpreter1.interpret("getEnv ENV_1", context1).message().get(0).getData()); - assertEquals("null", interpreter1.interpret("getEnv ENV_2", context1).message().get(0).getData()); - - assertEquals("value_1", interpreter1.interpret("getProperty property_1", context1).message().get(0).getData()); - assertEquals("null", interpreter1.interpret("getProperty property_2", context1).message().get(0).getData()); - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java deleted file mode 100644 index 5f7426a..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertTrue; - -public class RemoteInterpreterUtilsTest { - - @Test - public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { - assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java deleted file mode 100644 index a039a59..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.remote.mock; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; - -import java.util.List; -import java.util.Properties; - - -public class GetEnvPropertyInterpreter extends Interpreter { - - public GetEnvPropertyInterpreter(Properties property) { - super(property); - } - - @Override - public void open() { - } - - @Override - public void close() { - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - String[] cmd = st.split(" "); - if (cmd[0].equals("getEnv")) { - return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1]) == null ? "null" : System.getenv(cmd[1])); - } else if (cmd[0].equals("getProperty")){ - return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1]) == null ? "null" : System.getProperty(cmd[1])); - } else { - return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]); - } - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } - - @Override - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); - } -} - http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java deleted file mode 100644 index 5a3e57c..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote.mock; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; - -import java.util.List; -import java.util.Properties; - -public class MockInterpreterA extends Interpreter { - - private String lastSt; - - public MockInterpreterA(Properties property) { - super(property); - } - - @Override - public void open() { - //new RuntimeException().printStackTrace(); - } - - @Override - public void close() { - } - - public String getLastStatement() { - return lastSt; - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - if (property.containsKey("progress")) { - context.setProgress(Integer.parseInt(getProperty("progress"))); - } - try { - Thread.sleep(Long.parseLong(st)); - this.lastSt = st; - } catch (NumberFormatException | InterruptedException e) { - throw new InterpreterException(e); - } - return new InterpreterResult(Code.SUCCESS, st); - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } - - @Override - public Scheduler getScheduler() { - if (getProperty("parallel") != null && getProperty("parallel").equals("true")) { - return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10); - } else { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java deleted file mode 100644 index ec89241..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote.mock; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.AngularObjectWatcher; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -public class MockInterpreterAngular extends Interpreter { - - AtomicInteger numWatch = new AtomicInteger(0); - - public MockInterpreterAngular(Properties property) { - super(property); - } - - @Override - public void open() { - } - - @Override - public void close() { - - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - String[] stmt = st.split(" "); - String cmd = stmt[0]; - String name = null; - if (stmt.length >= 2) { - name = stmt[1]; - } - String value = null; - if (stmt.length == 3) { - value = stmt[2]; - } - - AngularObjectRegistry registry = context.getAngularObjectRegistry(); - - if (cmd.equals("add")) { - registry.add(name, value, context.getNoteId(), null); - registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher - (null) { - - @Override - public void watch(Object oldObject, Object newObject, - InterpreterContext context) { - numWatch.incrementAndGet(); - } - - }); - } else if (cmd.equalsIgnoreCase("update")) { - registry.get(name, context.getNoteId(), null).set(value); - } else if (cmd.equals("remove")) { - registry.remove(name, context.getNoteId(), null); - } - - try { - Thread.sleep(500); // wait for watcher executed - } catch (InterruptedException e) { - logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e); - } - - String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch - .get()); - return new InterpreterResult(Code.SUCCESS, msg); - } - - @Override - public void cancel(InterpreterContext context) { - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java deleted file mode 100644 index ff3ff9f..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote.mock; - -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; - -import java.util.List; -import java.util.Properties; - -public class MockInterpreterB extends Interpreter { - - public MockInterpreterB(Properties property) { - super(property); - } - - @Override - public void open() { - //new RuntimeException().printStackTrace(); - } - - @Override - public void close() { - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - MockInterpreterA intpA = getInterpreterA(); - String intpASt = intpA.getLastStatement(); - long timeToSleep = Long.parseLong(st); - if (intpASt != null) { - timeToSleep += Long.parseLong(intpASt); - } - try { - Thread.sleep(timeToSleep); - } catch (NumberFormatException | InterruptedException e) { - throw new InterpreterException(e); - } - return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep)); - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } - - public MockInterpreterA getInterpreterA() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - synchronized (interpreterGroup) { - for (List<Interpreter> interpreters : interpreterGroup.values()) { - boolean belongsToSameNoteGroup = false; - MockInterpreterA a = null; - for (Interpreter intp : interpreters) { - if (intp.getClassName().equals(MockInterpreterA.class.getName())) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - a = (MockInterpreterA) p; - } - - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - if (this == p) { - belongsToSameNoteGroup = true; - } - } - if (belongsToSameNoteGroup) { - return a; - } - } - } - return null; - } - - @Override - public Scheduler getScheduler() { - MockInterpreterA intpA = getInterpreterA(); - if (intpA != null) { - return intpA.getScheduler(); - } - return null; - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java deleted file mode 100644 index 1890cbc..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.remote.mock; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Properties; - -/** - * MockInterpreter to test outputstream - */ -public class MockInterpreterOutputStream extends Interpreter { - private String lastSt; - - public MockInterpreterOutputStream(Properties property) { - super(property); - } - - @Override - public void open() { - //new RuntimeException().printStackTrace(); - } - - @Override - public void close() { - } - - public String getLastStatement() { - return lastSt; - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - String[] ret = st.split(":"); - try { - if (ret[1] != null) { - context.out.write(ret[1]); - } - } catch (IOException e) { - throw new InterpreterException(e); - } - return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ? - ret[2] : ""); - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } - - @Override - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java deleted file mode 100644 index ee9f15c..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote.mock; - -import com.google.gson.Gson; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.resource.Resource; -import org.apache.zeppelin.resource.ResourcePool; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -public class MockInterpreterResourcePool extends Interpreter { - - AtomicInteger numWatch = new AtomicInteger(0); - - public MockInterpreterResourcePool(Properties property) { - super(property); - } - - @Override - public void open() { - } - - @Override - public void close() { - - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - String[] stmt = st.split(" "); - String cmd = stmt[0]; - String noteId = null; - String paragraphId = null; - String name = null; - if (stmt.length >= 2) { - String[] npn = stmt[1].split(":"); - if (npn.length >= 3) { - noteId = npn[0]; - paragraphId = npn[1]; - name = npn[2]; - } else { - name = stmt[1]; - } - } - String value = null; - if (stmt.length >= 3) { - value = stmt[2]; - } - - ResourcePool resourcePool = context.getResourcePool(); - Object ret = null; - if (cmd.equals("put")) { - resourcePool.put(noteId, paragraphId, name, value); - } else if (cmd.equalsIgnoreCase("get")) { - Resource resource = resourcePool.get(noteId, paragraphId, name); - if (resource != null) { - ret = resourcePool.get(noteId, paragraphId, name).get(); - } else { - ret = ""; - } - } else if (cmd.equals("remove")) { - ret = resourcePool.remove(noteId, paragraphId, name); - } else if (cmd.equals("getAll")) { - ret = resourcePool.getAll(); - } else if (cmd.equals("invoke")) { - Resource resource = resourcePool.get(noteId, paragraphId, name); - if (stmt.length >=4) { - Resource res = resource.invokeMethod(value, null, null, stmt[3]); - ret = res.get(); - } else { - ret = resource.invokeMethod(value, null, null); - } - } - - try { - Thread.sleep(500); // wait for watcher executed - } catch (InterruptedException e) { - } - - Gson gson = new Gson(); - return new InterpreterResult(Code.SUCCESS, gson.toJson(ret)); - } - - @Override - public void cancel(InterpreterContext context) { - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java deleted file mode 100644 index a1afe0e..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scheduler; - -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterInfo; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterRunner; -import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.scheduler.Job.Status; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { - - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - - private InterpreterSetting interpreterSetting; - private SchedulerFactory schedulerSvc; - private static final int TICK_WAIT = 100; - private static final int MAX_WAIT_CYCLES = 100; - - @Before - public void setUp() throws Exception { - schedulerSvc = new SchedulerFactory(); - - InterpreterOption interpreterOption = new InterpreterOption(); - interpreterOption.setRemote(true); - InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterA.class.getName(), "mock", true, new HashMap<String, Object>()); - List<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(interpreterInfo1); - InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT); - interpreterSetting = new InterpreterSetting.Builder() - .setId("test") - .setName("test") - .setGroup("test") - .setInterpreterInfos(interpreterInfos) - .setOption(interpreterOption) - .setRunner(runner) - .setInterpreterDir("../interpeters/test") - .create(); - } - - @After - public void tearDown() { - interpreterSetting.close(); - } - - @Test - public void test() throws Exception { - final RemoteInterpreter intpA = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); - - intpA.open(); - - Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", - intpA, - 10); - - Job job = new Job("jobId", "jobName", null, 200) { - Object results; - - @Override - public Object getReturn() { - return results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - intpA.interpret("1000", new InterpreterContext( - "note", - "jobId", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - null, - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - return "1000"; - } - - @Override - protected boolean jobAbort() { - return false; - } - - @Override - public void setResult(Object results) { - this.results = results; - } - }; - scheduler.submit(job); - - int cycles = 0; - while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) { - Thread.sleep(TICK_WAIT); - cycles++; - } - assertTrue(job.isRunning()); - - Thread.sleep(5 * TICK_WAIT); - assertEquals(0, scheduler.getJobsWaiting().size()); - assertEquals(1, scheduler.getJobsRunning().size()); - - cycles = 0; - while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) { - Thread.sleep(TICK_WAIT); - cycles++; - } - - assertTrue(job.isTerminated()); - assertEquals(0, scheduler.getJobsWaiting().size()); - assertEquals(0, scheduler.getJobsRunning().size()); - - intpA.close(); - schedulerSvc.removeScheduler("test"); - } - - @Test - public void testAbortOnPending() throws Exception { - final RemoteInterpreter intpA = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1"); - intpA.open(); - - Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", intpA, 10); - - Job job1 = new Job("jobId1", "jobName1", null, 200) { - Object results; - InterpreterContext context = new InterpreterContext( - "note", - "jobId1", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - null, - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null); - - @Override - public Object getReturn() { - return results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - intpA.interpret("1000", context); - return "1000"; - } - - @Override - protected boolean jobAbort() { - if (isRunning()) { - intpA.cancel(context); - } - return true; - } - - @Override - public void setResult(Object results) { - this.results = results; - } - }; - - Job job2 = new Job("jobId2", "jobName2", null, 200) { - public Object results; - InterpreterContext context = new InterpreterContext( - "note", - "jobId2", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - null, - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null); - - @Override - public Object getReturn() { - return results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - intpA.interpret("1000", context); - return "1000"; - } - - @Override - protected boolean jobAbort() { - if (isRunning()) { - intpA.cancel(context); - } - return true; - } - - @Override - public void setResult(Object results) { - this.results = results; - } - }; - - job2.setResult("result2"); - - scheduler.submit(job1); - scheduler.submit(job2); - - - int cycles = 0; - while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) { - Thread.sleep(TICK_WAIT); - cycles++; - } - assertTrue(job1.isRunning()); - assertTrue(job2.getStatus() == Status.PENDING); - - job2.abort(); - - cycles = 0; - while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) { - Thread.sleep(TICK_WAIT); - cycles++; - } - - assertNotNull(job1.getDateFinished()); - assertTrue(job1.isTerminated()); - assertNull(job2.getDateFinished()); - assertTrue(job2.isTerminated()); - assertEquals("result2", job2.getReturn()); - - intpA.close(); - schedulerSvc.removeScheduler("test"); - } - - @Override - public void onOutputAppend(String noteId, String paragraphId, int index, String output) { - - } - - @Override - public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { - - } - - @Override - public void onOutputClear(String noteId, String paragraphId) { - - } - - @Override - public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { - - } - - @Override - public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { - if (callback != null) { - callback.onFinished(new LinkedList<>()); - } - } - - @Override - public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { - } - - @Override - public void onParaInfosReceived(String noteId, String paragraphId, - String interpreterSettingId, Map<String, String> metaInfos) { - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/resources/conf/interpreter.json ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/conf/interpreter.json b/zeppelin-interpreter/src/test/resources/conf/interpreter.json deleted file mode 100644 index 45e1d60..0000000 --- a/zeppelin-interpreter/src/test/resources/conf/interpreter.json +++ /dev/null @@ -1,115 +0,0 @@ -{ - "interpreterSettings": { - "2C3RWCVAG": { - "id": "2C3RWCVAG", - "name": "test", - "group": "test", - "properties": { - "property_1": "value_1", - "property_2": "new_value_2", - "property_3": "value_3" - }, - "status": "READY", - "interpreterGroup": [ - { - "name": "echo", - "class": "org.apache.zeppelin.interpreter.EchoInterpreter", - "defaultInterpreter": true, - "editor": { - "language": "java", - "editOnDblClick": false - } - } - ], - "dependencies": [], - "option": { - "remote": true, - "port": -1, - "perNote": "shared", - "perUser": "shared", - "isExistingProcess": false, - "setPermission": false, - "users": [], - "isUserImpersonate": false - } - }, - - "2CKWE7B19": { - "id": "2CKWE7B19", - "name": "test2", - "group": "test", - "properties": { - "property_1": "value_1", - "property_2": "new_value_2", - "property_3": "value_3" - }, - "status": "READY", - "interpreterGroup": [ - { - "name": "echo", - "class": "org.apache.zeppelin.interpreter.EchoInterpreter", - "defaultInterpreter": true, - "editor": { - "language": "java", - "editOnDblClick": false - } - } - ], - "dependencies": [], - "option": { - "remote": true, - "port": -1, - "perNote": "shared", - "perUser": "shared", - "isExistingProcess": false, - "setPermission": false, - "users": [], - "isUserImpersonate": false - } - } - }, - "interpreterBindings": { - "2C6793KRV": [ - "2C48Y7FSJ", - "2C63XW4XE", - "2C66GE1VB", - "2C5VH924X", - "2C4BJDRRZ", - "2C3SQSB7V", - "2C4HKDCQW", - "2C3DR183X", - "2C66Z9XPQ", - "2C3PTPMUH", - "2C69WE69N", - "2C5SRRXHM", - "2C4ZD49PF", - "2C6V3D44K", - "2C4UB1UZA", - "2C5S1R21W", - "2C5DCRVGM", - "2C686X8ZH", - "2C3RWCVAG", - "2C3JKFMJU", - "2C3VECEG2" - ] - }, - "interpreterRepositories": [ - { - "id": "central", - "type": "default", - "url": "http://repo1.maven.org/maven2/", - "releasePolicy": { - "enabled": true, - "updatePolicy": "daily", - "checksumPolicy": "warn" - }, - "snapshotPolicy": { - "enabled": true, - "updatePolicy": "daily", - "checksumPolicy": "warn" - }, - "mirroredRepositories": [], - "repositoryManager": false - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json deleted file mode 100644 index 1ba1b94..0000000 --- a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json +++ /dev/null @@ -1,42 +0,0 @@ -[ - { - "group": "test", - "name": "double_echo", - "className": "org.apache.zeppelin.interpreter.DoubleEchoInterpreter", - "properties": { - "property_1": { - "envName": "PROPERTY_1", - "propertyName": "property_1", - "defaultValue": "value_1", - "description": "desc_1" - }, - "property_2": { - "envName": "PROPERTY_2", - "propertyName": "property_2", - "defaultValue": "value_2", - "description": "desc_2" - } - } - }, - - { - "group": "test", - "name": "echo", - "defaultInterpreter": true, - "className": "org.apache.zeppelin.interpreter.EchoInterpreter", - "properties": { - "property_1": { - "envName": "PROPERTY_1", - "propertyName": "property_1", - "defaultValue": "value_1", - "description": "desc_1" - }, - "property_2": { - "envName": "PROPERTY_2", - "propertyName": "property_2", - "defaultValue": "value_2", - "description": "desc_2" - } - } - } -] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-interpreter/src/test/resources/log4j.properties index 6f34691..d8a7839 100644 --- a/zeppelin-interpreter/src/test/resources/log4j.properties +++ b/zeppelin-interpreter/src/test/resources/log4j.properties @@ -26,6 +26,4 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n # # Root logger option -log4j.rootLogger=INFO, stdout -log4j.logger.org.apache.zeppelin.interpreter=DEBUG -log4j.logger.org.apache.zeppelin.scheduler=DEBUG \ No newline at end of file +log4j.rootLogger=INFO, stdout \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index c1dba5c..cd0210e 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -185,7 +185,7 @@ public class InterpreterRestApi { String noteId = request == null ? null : request.getNoteId(); if (null == noteId) { - interpreterSettingManager.close(settingId); + interpreterSettingManager.close(setting); } else { interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal()); } @@ -208,7 +208,7 @@ public class InterpreterRestApi { @GET @ZeppelinApi public Response listInterpreter(String message) { - Map<String, InterpreterSetting> m = interpreterSettingManager.getInterpreterSettingTemplates(); + Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings(); return new JsonResponse<>(Status.OK, "", m).build(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 53ee114..7453470 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -93,11 +93,13 @@ public class ZeppelinServer extends Application { private NotebookRepoSync notebookRepo; private NotebookAuthorization notebookAuthorization; private Credentials credentials; + private DependencyResolver depResolver; public ZeppelinServer() throws Exception { ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - + this.depResolver = new DependencyResolver( + conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO)); InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT); @@ -127,26 +129,13 @@ public class ZeppelinServer extends Application { new File(conf.getRelativeDir("zeppelin-web/src/app/spell"))); } - this.schedulerFactory = SchedulerFactory.singleton(); - this.interpreterSettingManager = new InterpreterSettingManager(conf, notebookWsServer, - notebookWsServer, notebookWsServer); - this.replFactory = new InterpreterFactory(interpreterSettingManager); - this.notebookRepo = new NotebookRepoSync(conf); - this.noteSearchService = new LuceneSearch(); - this.notebookAuthorization = NotebookAuthorization.init(conf); - this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); - notebook = new Notebook(conf, - notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer, - noteSearchService, notebookAuthorization, credentials); - ZeppelinServer.helium = new Helium( conf.getHeliumConfPath(), conf.getHeliumRegistry(), new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO), "helium-registry-cache"), heliumBundleFactory, - heliumApplicationFactory, - interpreterSettingManager); + heliumApplicationFactory); // create bundle try { @@ -155,6 +144,20 @@ public class ZeppelinServer extends Application { LOG.error(e.getMessage(), e); } + this.schedulerFactory = new SchedulerFactory(); + this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, + new InterpreterOption(true)); + this.replFactory = new InterpreterFactory(conf, notebookWsServer, + notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(), + interpreterSettingManager); + this.notebookRepo = new NotebookRepoSync(conf); + this.noteSearchService = new LuceneSearch(); + this.notebookAuthorization = NotebookAuthorization.init(conf); + this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); + notebook = new Notebook(conf, + notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer, + noteSearchService, notebookAuthorization, credentials); + // to update notebook from application event from remote process. heliumApplicationFactory.setNotebook(notebook); // to update fire websocket event on application event. @@ -203,7 +206,7 @@ public class ZeppelinServer extends Application { LOG.info("Shutting down Zeppelin Server ... "); try { jettyWebServer.stop(); - notebook.getInterpreterSettingManager().close(); + notebook.getInterpreterSettingManager().shutdown(); notebook.close(); Thread.sleep(3000); } catch (Exception e) {