Repository: zeppelin
Updated Branches:
  refs/heads/master 645037b36 -> a1e69add4


ZEPPELIN-3355. Support inline configuration for user session

### What is this PR for?
Generic ConfInterpreter only support process level configuration, this PR is 
trying to introduce UserSessionConfInterpreter which can customize session 
level configuration, One example is for livy interpreter that can customize 
livy session.

### What type of PR is it?
[Feature ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3355

### How should this be tested?
* Ci pass and manually test livy interpreter

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2883 from zjffdu/ZEPPELIN-3355 and squashes the following commits:

6708ef0 [Jeff Zhang] ZEPPELIN-3355. Support inline configuration for session


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/a1e69add
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/a1e69add
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/a1e69add

Branch: refs/heads/master
Commit: a1e69add442d112e00dbcf76b0abb21bc984e5be
Parents: 645037b
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Mar 20 17:35:13 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Sun Mar 25 10:56:44 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/interpreter/ConfInterpreter.java   |  7 +-
 .../interpreter/InterpreterSetting.java         | 17 ++++-
 .../interpreter/InterpreterSettingManager.java  |  4 --
 .../interpreter/SessionConfInterpreter.java     | 71 ++++++++++++++++++++
 .../interpreter/remote/RemoteInterpreter.java   |  6 ++
 .../interpreter/SessionConfInterpreterTest.java | 69 +++++++++++++++++++
 6 files changed, 166 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1e69add/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java
index d50c57b..7d1df9b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java
@@ -33,14 +33,17 @@ public class ConfInterpreter extends Interpreter {
 
   private static Logger LOGGER = 
LoggerFactory.getLogger(ConfInterpreter.class);
 
-  private String interpreterGroupId;
-  private InterpreterSetting interpreterSetting;
+  protected String sessionId;
+  protected String interpreterGroupId;
+  protected InterpreterSetting interpreterSetting;
 
 
   public ConfInterpreter(Properties properties,
+                         String sessionId,
                          String interpreterGroupId,
                          InterpreterSetting interpreterSetting) {
     super(properties);
+    this.sessionId = sessionId;
     this.interpreterGroupId = interpreterGroupId;
     this.interpreterSetting = interpreterSetting;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1e69add/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index a3b7f8d..816499c 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -689,7 +689,16 @@ public class InterpreterSetting {
       LOGGER.info("Interpreter {} created for user: {}, sessionId: {}",
           interpreter.getClassName(), user, sessionId);
     }
-    interpreters.add(new ConfInterpreter(intpProperties, interpreterGroupId, 
this));
+
+    // TODO(zjffdu) this kind of hardcode is ugly. For now 
SessionConfInterpreter is used
+    // for livy, we could add new property in interpreter-setting.json when 
there's new interpreter
+    // require SessionConfInterpreter
+    if (group.equals("livy")) {
+      interpreters.add(
+          new SessionConfInterpreter(intpProperties, sessionId, 
interpreterGroupId, this));
+    } else {
+      interpreters.add(new ConfInterpreter(intpProperties, sessionId, 
interpreterGroupId, this));
+    }
     return interpreters;
   }
 
@@ -751,7 +760,11 @@ public class InterpreterSetting {
     //TODO(zjffdu) It requires user can not create interpreter with name 
`conf`,
     // conf is a reserved word of interpreter name
     if (replName.equals("conf")) {
-      return ConfInterpreter.class.getName();
+      if (group.equals("livy")) {
+        return SessionConfInterpreter.class.getName();
+      } else {
+        return ConfInterpreter.class.getName();
+      }
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1e69add/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 23d086d..a5184c2 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -34,8 +34,6 @@ import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
-import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
-import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
@@ -45,10 +43,8 @@ import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.resource.ResourceSet;
 import org.apache.zeppelin.util.ReflectionUtils;
 import org.apache.zeppelin.storage.ConfigStorage;
-import org.apache.zeppelin.storage.FileSystemConfigStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.sonatype.aether.RepositoryException;
 import org.sonatype.aether.repository.Proxy;
 import org.sonatype.aether.repository.RemoteRepository;
 import org.sonatype.aether.repository.Authentication;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1e69add/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java
new file mode 100644
index 0000000..e303ee6
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Properties;
+
+public class SessionConfInterpreter extends ConfInterpreter {
+
+  private static Logger LOGGER = 
LoggerFactory.getLogger(SessionConfInterpreter.class);
+
+  public SessionConfInterpreter(Properties properties,
+                                String sessionId,
+                                String interpreterGroupId,
+                                InterpreterSetting interpreterSetting) {
+    super(properties, sessionId, interpreterGroupId, interpreterSetting);
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context)
+      throws InterpreterException {
+    try {
+      Properties finalProperties = new Properties();
+      finalProperties.putAll(this.properties);
+      Properties updatedProperties = new Properties();
+      updatedProperties.load(new StringReader(st));
+      finalProperties.putAll(updatedProperties);
+      LOGGER.debug("Properties for Session: " + sessionId + ": " + 
finalProperties);
+
+      List<Interpreter> interpreters =
+          
interpreterSetting.getInterpreterGroup(interpreterGroupId).get(sessionId);
+      for (Interpreter intp : interpreters) {
+        // only check the RemoteInterpreter, ConfInterpreter itself will be 
ignored here.
+        if (intp instanceof RemoteInterpreter) {
+          RemoteInterpreter remoteInterpreter = (RemoteInterpreter) intp;
+          if (remoteInterpreter.isOpened()) {
+            return new InterpreterResult(InterpreterResult.Code.ERROR,
+                "Can not change interpreter session properties after this 
session is started");
+          }
+          remoteInterpreter.setProperties(finalProperties);
+        }
+      }
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+    } catch (IOException e) {
+      LOGGER.error("Fail to update interpreter setting", e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR, 
ExceptionUtils.getStackTrace(e));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1e69add/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index f38d037..34ed804 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import org.apache.thrift.TException;
@@ -88,6 +89,11 @@ public class RemoteInterpreter extends Interpreter {
     return isOpened;
   }
 
+  @VisibleForTesting
+  public void setOpened(boolean opened) {
+    isOpened = opened;
+  }
+
   @Override
   public String getClassName() {
     return className;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a1e69add/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SessionConfInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SessionConfInterpreterTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SessionConfInterpreterTest.java
new file mode 100644
index 0000000..7baa2e2
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SessionConfInterpreterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SessionConfInterpreterTest {
+
+  @Test
+  public void testUserSessionConfInterpreter() throws InterpreterException {
+
+    InterpreterSetting mockInterpreterSetting = mock(InterpreterSetting.class);
+    ManagedInterpreterGroup mockInterpreterGroup = 
mock(ManagedInterpreterGroup.class);
+    
when(mockInterpreterSetting.getInterpreterGroup("group_1")).thenReturn(mockInterpreterGroup);
+
+    Properties properties = new Properties();
+    properties.setProperty("property_1", "value_1");
+    properties.setProperty("property_2", "value_2");
+    SessionConfInterpreter confInterpreter = new SessionConfInterpreter(
+        properties, "session_1", "group_1", mockInterpreterSetting);
+
+    RemoteInterpreter remoteInterpreter =
+        new RemoteInterpreter(properties, "session_1", "clasName", "user1", 
null);
+    List<Interpreter> interpreters = new ArrayList<>();
+    interpreters.add(confInterpreter);
+    interpreters.add(remoteInterpreter);
+    when(mockInterpreterGroup.get("session_1")).thenReturn(interpreters);
+
+    InterpreterResult result =
+        
confInterpreter.interpret("property_1\tupdated_value_1\nproperty_3\tvalue_3",
+            mock(InterpreterContext.class));
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code);
+    assertEquals(3, remoteInterpreter.getProperties().size());
+    assertEquals("updated_value_1", 
remoteInterpreter.getProperty("property_1"));
+    assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
+    assertEquals("value_3", remoteInterpreter.getProperty("property_3"));
+
+    remoteInterpreter.setOpened(true);
+    result =
+        
confInterpreter.interpret("property_1\tupdated_value_1\nproperty_3\tvalue_3",
+            mock(InterpreterContext.class));
+    assertEquals(InterpreterResult.Code.ERROR, result.code);
+  }
+}

Reply via email to