Repository: zeppelin Updated Branches: refs/heads/branch-0.8 f058f2a41 -> fbbb46b46
ZEPPELIN-3355. Support inline configuration for user session ### What is this PR for? Generic ConfInterpreter only support process level configuration, this PR is trying to introduce UserSessionConfInterpreter which can customize session level configuration, One example is for livy interpreter that can customize livy session. ### What type of PR is it? [Feature ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3355 ### How should this be tested? * Ci pass and manually test livy interpreter ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2883 from zjffdu/ZEPPELIN-3355 and squashes the following commits: 6708ef0 [Jeff Zhang] ZEPPELIN-3355. Support inline configuration for session (cherry picked from commit a1e69add442d112e00dbcf76b0abb21bc984e5be) Signed-off-by: Jeff Zhang <zjf...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/fbbb46b4 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/fbbb46b4 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/fbbb46b4 Branch: refs/heads/branch-0.8 Commit: fbbb46b468b171e72232f5497e6d494d60f37ec3 Parents: f058f2a Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Mar 20 17:35:13 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Sun Mar 25 10:56:57 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/interpreter/ConfInterpreter.java | 7 +- .../interpreter/InterpreterSetting.java | 17 ++++- .../interpreter/InterpreterSettingManager.java | 4 -- .../interpreter/SessionConfInterpreter.java | 71 ++++++++++++++++++++ .../interpreter/remote/RemoteInterpreter.java | 6 ++ .../interpreter/SessionConfInterpreterTest.java | 69 +++++++++++++++++++ 6 files changed, 166 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fbbb46b4/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java index d50c57b..7d1df9b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java @@ -33,14 +33,17 @@ public class ConfInterpreter extends Interpreter { private static Logger LOGGER = LoggerFactory.getLogger(ConfInterpreter.class); - private String interpreterGroupId; - private InterpreterSetting interpreterSetting; + protected String sessionId; + protected String interpreterGroupId; + protected InterpreterSetting interpreterSetting; public ConfInterpreter(Properties properties, + String sessionId, String interpreterGroupId, InterpreterSetting interpreterSetting) { super(properties); + this.sessionId = sessionId; this.interpreterGroupId = interpreterGroupId; this.interpreterSetting = interpreterSetting; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fbbb46b4/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index a3b7f8d..816499c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -689,7 +689,16 @@ public class InterpreterSetting { LOGGER.info("Interpreter {} created for user: {}, sessionId: {}", interpreter.getClassName(), user, sessionId); } - interpreters.add(new ConfInterpreter(intpProperties, interpreterGroupId, this)); + + // TODO(zjffdu) this kind of hardcode is ugly. For now SessionConfInterpreter is used + // for livy, we could add new property in interpreter-setting.json when there's new interpreter + // require SessionConfInterpreter + if (group.equals("livy")) { + interpreters.add( + new SessionConfInterpreter(intpProperties, sessionId, interpreterGroupId, this)); + } else { + interpreters.add(new ConfInterpreter(intpProperties, sessionId, interpreterGroupId, this)); + } return interpreters; } @@ -751,7 +760,11 @@ public class InterpreterSetting { //TODO(zjffdu) It requires user can not create interpreter with name `conf`, // conf is a reserved word of interpreter name if (replName.equals("conf")) { - return ConfInterpreter.class.getName(); + if (group.equals("livy")) { + return SessionConfInterpreter.class.getName(); + } else { + return ConfInterpreter.class.getName(); + } } return null; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fbbb46b4/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 5eacbbb..f3384c1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -34,8 +34,6 @@ import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; -import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage; -import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; @@ -45,10 +43,8 @@ import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.ResourceSet; import org.apache.zeppelin.util.ReflectionUtils; import org.apache.zeppelin.storage.ConfigStorage; -import org.apache.zeppelin.storage.FileSystemConfigStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.sonatype.aether.RepositoryException; import org.sonatype.aether.repository.Proxy; import org.sonatype.aether.repository.RemoteRepository; import org.sonatype.aether.repository.Authentication; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fbbb46b4/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java new file mode 100644 index 0000000..e303ee6 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringReader; +import java.util.List; +import java.util.Properties; + +public class SessionConfInterpreter extends ConfInterpreter { + + private static Logger LOGGER = LoggerFactory.getLogger(SessionConfInterpreter.class); + + public SessionConfInterpreter(Properties properties, + String sessionId, + String interpreterGroupId, + InterpreterSetting interpreterSetting) { + super(properties, sessionId, interpreterGroupId, interpreterSetting); + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { + try { + Properties finalProperties = new Properties(); + finalProperties.putAll(this.properties); + Properties updatedProperties = new Properties(); + updatedProperties.load(new StringReader(st)); + finalProperties.putAll(updatedProperties); + LOGGER.debug("Properties for Session: " + sessionId + ": " + finalProperties); + + List<Interpreter> interpreters = + interpreterSetting.getInterpreterGroup(interpreterGroupId).get(sessionId); + for (Interpreter intp : interpreters) { + // only check the RemoteInterpreter, ConfInterpreter itself will be ignored here. + if (intp instanceof RemoteInterpreter) { + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) intp; + if (remoteInterpreter.isOpened()) { + return new InterpreterResult(InterpreterResult.Code.ERROR, + "Can not change interpreter session properties after this session is started"); + } + remoteInterpreter.setProperties(finalProperties); + } + } + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } catch (IOException e) { + LOGGER.error("Fail to update interpreter setting", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fbbb46b4/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index f38d037..34ed804 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter.remote; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.thrift.TException; @@ -88,6 +89,11 @@ public class RemoteInterpreter extends Interpreter { return isOpened; } + @VisibleForTesting + public void setOpened(boolean opened) { + isOpened = opened; + } + @Override public String getClassName() { return className; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fbbb46b4/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SessionConfInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SessionConfInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SessionConfInterpreterTest.java new file mode 100644 index 0000000..7baa2e2 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SessionConfInterpreterTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter; + +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SessionConfInterpreterTest { + + @Test + public void testUserSessionConfInterpreter() throws InterpreterException { + + InterpreterSetting mockInterpreterSetting = mock(InterpreterSetting.class); + ManagedInterpreterGroup mockInterpreterGroup = mock(ManagedInterpreterGroup.class); + when(mockInterpreterSetting.getInterpreterGroup("group_1")).thenReturn(mockInterpreterGroup); + + Properties properties = new Properties(); + properties.setProperty("property_1", "value_1"); + properties.setProperty("property_2", "value_2"); + SessionConfInterpreter confInterpreter = new SessionConfInterpreter( + properties, "session_1", "group_1", mockInterpreterSetting); + + RemoteInterpreter remoteInterpreter = + new RemoteInterpreter(properties, "session_1", "clasName", "user1", null); + List<Interpreter> interpreters = new ArrayList<>(); + interpreters.add(confInterpreter); + interpreters.add(remoteInterpreter); + when(mockInterpreterGroup.get("session_1")).thenReturn(interpreters); + + InterpreterResult result = + confInterpreter.interpret("property_1\tupdated_value_1\nproperty_3\tvalue_3", + mock(InterpreterContext.class)); + assertEquals(InterpreterResult.Code.SUCCESS, result.code); + assertEquals(3, remoteInterpreter.getProperties().size()); + assertEquals("updated_value_1", remoteInterpreter.getProperty("property_1")); + assertEquals("value_2", remoteInterpreter.getProperty("property_2")); + assertEquals("value_3", remoteInterpreter.getProperty("property_3")); + + remoteInterpreter.setOpened(true); + result = + confInterpreter.interpret("property_1\tupdated_value_1\nproperty_3\tvalue_3", + mock(InterpreterContext.class)); + assertEquals(InterpreterResult.Code.ERROR, result.code); + } +}