http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 102ca1a..3ddeec0 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -41,7 +41,12 @@ import 
org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.helium.HeliumPackage;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -458,8 +463,7 @@ public class NotebookServer extends WebSocketServlet
     Notebook notebook = notebook();
     List<Note> notes = notebook.getAllNotes();
     for (Note note : notes) {
-      List<String> ids = notebook.getInterpreterSettingManager()
-          .getInterpreterBinding(note.getId());
+      List<String> ids = 
notebook.getInterpreterSettingManager().getInterpreters(note.getId());
       for (String id : ids) {
         if (id.equals(interpreterGroupId)) {
           broadcast(note.getId(), m);
@@ -999,7 +1003,7 @@ public class NotebookServer extends WebSocketServlet
         List<String> interpreterSettingIds = new LinkedList<>();
         interpreterSettingIds.add(defaultInterpreterId);
         for (String interpreterSettingId : 
notebook.getInterpreterSettingManager().
-            getInterpreterSettingIds()) {
+            getDefaultInterpreterSettingList()) {
           if (!interpreterSettingId.equals(defaultInterpreterId)) {
             interpreterSettingIds.add(interpreterSettingId);
           }
@@ -1359,13 +1363,12 @@ public class NotebookServer extends WebSocketServlet
       List<InterpreterSetting> settings =
           
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
       for (InterpreterSetting setting : settings) {
-        if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) {
+        if (setting.getInterpreterGroup(user, note.getId()) == null) {
           continue;
         }
-        if 
(interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, 
note.getId())
-            .getId())) {
+        if (interpreterGroupId.equals(setting.getInterpreterGroup(user, 
note.getId()).getId())) {
           AngularObjectRegistry angularObjectRegistry =
-              setting.getOrCreateInterpreterGroup(user, 
note.getId()).getAngularObjectRegistry();
+              setting.getInterpreterGroup(user, 
note.getId()).getAngularObjectRegistry();
 
           // first trying to get local registry
           ao = angularObjectRegistry.get(varName, noteId, paragraphId);
@@ -1402,13 +1405,12 @@ public class NotebookServer extends WebSocketServlet
         List<InterpreterSetting> settings =
             
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
         for (InterpreterSetting setting : settings) {
-          if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) {
+          if (setting.getInterpreterGroup(user, n.getId()) == null) {
             continue;
           }
-          if 
(interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId())
-              .getId())) {
+          if (interpreterGroupId.equals(setting.getInterpreterGroup(user, 
n.getId()).getId())) {
             AngularObjectRegistry angularObjectRegistry =
-                setting.getOrCreateInterpreterGroup(user, 
n.getId()).getAngularObjectRegistry();
+                setting.getInterpreterGroup(user, 
n.getId()).getAngularObjectRegistry();
             this.broadcastExcept(n.getId(),
                 new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
                     .put("interpreterGroupId", 
interpreterGroupId).put("noteId", n.getId())
@@ -2281,13 +2283,13 @@ public class NotebookServer extends WebSocketServlet
 
     for (InterpreterSetting intpSetting : settings) {
       AngularObjectRegistry registry =
-          intpSetting.getOrCreateInterpreterGroup(user, 
note.getId()).getAngularObjectRegistry();
+          intpSetting.getInterpreterGroup(user, 
note.getId()).getAngularObjectRegistry();
       List<AngularObject> objects = registry.getAllWithGlobal(note.getId());
       for (AngularObject object : objects) {
         conn.send(serializeMessage(
             new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
                 .put("interpreterGroupId",
-                    intpSetting.getOrCreateInterpreterGroup(user, 
note.getId()).getId())
+                    intpSetting.getInterpreterGroup(user, 
note.getId()).getId())
                 .put("noteId", note.getId()).put("paragraphId", 
object.getParagraphId())));
       }
     }
@@ -2333,7 +2335,7 @@ public class NotebookServer extends WebSocketServlet
       }
 
       List<String> settingIds =
-          
notebook.getInterpreterSettingManager().getInterpreterBinding(note.getId());
+          
notebook.getInterpreterSettingManager().getInterpreters(note.getId());
       for (String id : settingIds) {
         if (interpreterGroupId.contains(id)) {
           broadcast(note.getId(),

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
new file mode 100644
index 0000000..1b1306a
--- /dev/null
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.mock;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+public class MockInterpreter1 extends Interpreter{
+  Map<String, Object> vars = new HashMap<>();
+
+  public MockInterpreter1(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode());
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index e2f171f..a7907db 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -307,9 +307,10 @@ public abstract class AbstractTestRestApi {
   protected static void shutDown() throws Exception {
     if (!wasRunning) {
       // restart interpreter to stop all interpreter processes
-      List<InterpreterSetting> settingList = 
ZeppelinServer.notebook.getInterpreterSettingManager().get();
-      for (InterpreterSetting setting : settingList) {
-        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
+      List<String> settingList = 
ZeppelinServer.notebook.getInterpreterSettingManager()
+          .getDefaultInterpreterSettingList();
+      for (String setting : settingList) {
+        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting);
       }
       if (shiroIni != null) {
         FileUtils.deleteQuietly(shiroIni);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
index 72dd8a7..28541bd 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
@@ -80,7 +80,7 @@ public class InterpreterRestApiTest extends 
AbstractTestRestApi {
 
     // then
     assertThat(get, isAllowed());
-    
assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getInterpreterSettingTemplates().size(),
+    
assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getAvailableInterpreterSettings().size(),
         body.entrySet().size());
     get.releaseConnection();
   }
@@ -110,7 +110,7 @@ public class InterpreterRestApiTest extends 
AbstractTestRestApi {
   @Test
   public void testSettingsCRUD() throws IOException {
     // when: call create setting API
-    String rawRequest = "{\"name\":\"md3\",\"group\":\"md\"," +
+    String rawRequest = "{\"name\":\"md2\",\"group\":\"md\"," +
         "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": 
\"propname\", \"type\": \"textarea\"}}," +
         
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}],"
 +
         "\"dependencies\":[]," +
@@ -367,7 +367,7 @@ public class InterpreterRestApiTest extends 
AbstractTestRestApi {
 
   @Test
   public void testGetMetadataInfo() throws IOException {
-    String jsonRequest = "{\"name\":\"spark_new\",\"group\":\"spark\"," +
+    String jsonRequest = "{\"name\":\"spark\",\"group\":\"spark\"," +
             "\"properties\":{\"propname\": {\"value\": \"propvalue\", 
\"name\": \"propname\", \"type\": \"textarea\"}}," +
             
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}],"
 +
             "\"dependencies\":[]," +

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 10d77b2..8da36a6 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -30,7 +30,6 @@ import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.rest.AbstractTestRestApi;
-import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.AfterClass;
@@ -96,7 +95,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
   }
 
   @Test
-  public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() 
throws IOException, InterruptedException {
+  public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() 
throws IOException {
     // create a notebook
     Note note1 = notebook.createNote(anonymous);
 
@@ -105,7 +104,7 @@ public class NotebookServerTest extends AbstractTestRestApi 
{
     List<InterpreterSetting> settings = 
notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId());
     for (InterpreterSetting setting : settings) {
       if (setting.getName().equals("md")) {
-        interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", 
"sharedProcess");
+        interpreterGroup = setting.getInterpreterGroup("anonymous", 
"sharedProcess");
         break;
       }
     }
@@ -116,14 +115,6 @@ public class NotebookServerTest extends 
AbstractTestRestApi {
     p1.setAuthenticationInfo(anonymous);
     note1.run(p1.getId());
 
-    // wait for paragraph finished
-    while(true) {
-      if (p1.getStatus() == Job.Status.FINISHED) {
-        break;
-      }
-      Thread.sleep(100);
-    }
-
     // add angularObject
     interpreterGroup.getAngularObjectRegistry().add("object1", "value1", 
note1.getId(), null);
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties 
b/zeppelin-server/src/test/resources/log4j.properties
index 8368993..b0d1067 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -33,6 +33,7 @@ log4j.logger.org.apache.hadoop.mapred=WARN
 log4j.logger.org.apache.hadoop.hive.ql=WARN
 log4j.logger.org.apache.hadoop.hive.metastore=WARN
 log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN
+log4j.logger.org.apache.zeppelin.scheduler=WARN
 
 log4j.logger.org.quartz=WARN
 log4j.logger.DataNucleus=WARN

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 3ae382a..b3d5c63 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -61,14 +61,6 @@
     </dependency>
 
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>zeppelin-interpreter</artifactId>
-      <version>${project.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
@@ -79,6 +71,11 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
new file mode 100644
index 0000000..f00fe93
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -0,0 +1,835 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.conf;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.tree.ConfigurationNode;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.GitNotebookRepo;
+import org.apache.zeppelin.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Zeppelin configuration.
+ *
+ */
+public class ZeppelinConfiguration extends XMLConfiguration {
+  private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";
+  private static final long serialVersionUID = 4749305895693848035L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZeppelinConfiguration.class);
+
+  private static final String HELIUM_PACKAGE_DEFAULT_URL =
+      "https://s3.amazonaws.com/helium-package/helium.json";;
+  private static ZeppelinConfiguration conf;
+
+  public ZeppelinConfiguration(URL url) throws ConfigurationException {
+    setDelimiterParsingDisabled(true);
+    load(url);
+  }
+
+  public ZeppelinConfiguration() {
+    ConfVars[] vars = ConfVars.values();
+    for (ConfVars v : vars) {
+      if (v.getType() == ConfVars.VarType.BOOLEAN) {
+        this.setProperty(v.getVarName(), v.getBooleanValue());
+      } else if (v.getType() == ConfVars.VarType.LONG) {
+        this.setProperty(v.getVarName(), v.getLongValue());
+      } else if (v.getType() == ConfVars.VarType.INT) {
+        this.setProperty(v.getVarName(), v.getIntValue());
+      } else if (v.getType() == ConfVars.VarType.FLOAT) {
+        this.setProperty(v.getVarName(), v.getFloatValue());
+      } else if (v.getType() == ConfVars.VarType.STRING) {
+        this.setProperty(v.getVarName(), v.getStringValue());
+      } else {
+        throw new RuntimeException("Unsupported VarType");
+      }
+    }
+
+  }
+
+
+  /**
+   * Load from resource.
+   *url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML);
+   * @throws ConfigurationException
+   */
+  public static synchronized ZeppelinConfiguration create() {
+    if (conf != null) {
+      return conf;
+    }
+
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    URL url;
+
+    url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML);
+    if (url == null) {
+      ClassLoader cl = ZeppelinConfiguration.class.getClassLoader();
+      if (cl != null) {
+        url = cl.getResource(ZEPPELIN_SITE_XML);
+      }
+    }
+    if (url == null) {
+      url = classLoader.getResource(ZEPPELIN_SITE_XML);
+    }
+
+    if (url == null) {
+      LOG.warn("Failed to load configuration, proceeding with a default");
+      conf = new ZeppelinConfiguration();
+    } else {
+      try {
+        LOG.info("Load configuration from " + url);
+        conf = new ZeppelinConfiguration(url);
+      } catch (ConfigurationException e) {
+        LOG.warn("Failed to load configuration from " + url + " proceeding 
with a default", e);
+        conf = new ZeppelinConfiguration();
+      }
+    }
+
+    LOG.info("Server Host: " + conf.getServerAddress());
+    if (conf.useSsl() == false) {
+      LOG.info("Server Port: " + conf.getServerPort());
+    } else {
+      LOG.info("Server SSL Port: " + conf.getServerSslPort());
+    }
+    LOG.info("Context Path: " + conf.getServerContextPath());
+    LOG.info("Zeppelin Version: " + Util.getVersion());
+
+    return conf;
+  }
+
+
+  private String getStringValue(String name, String d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.isEmpty()) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return (String) p.getChildren("value").get(0).getValue();
+      }
+    }
+    return d;
+  }
+
+  private int getIntValue(String name, int d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.isEmpty()) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return Integer.parseInt((String) 
p.getChildren("value").get(0).getValue());
+      }
+    }
+    return d;
+  }
+
+  private long getLongValue(String name, long d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.isEmpty()) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return Long.parseLong((String) 
p.getChildren("value").get(0).getValue());
+      }
+    }
+    return d;
+  }
+
+  private float getFloatValue(String name, float d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.isEmpty()) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return Float.parseFloat((String) 
p.getChildren("value").get(0).getValue());
+      }
+    }
+    return d;
+  }
+
+  private boolean getBooleanValue(String name, boolean d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.isEmpty()) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return Boolean.parseBoolean((String) 
p.getChildren("value").get(0).getValue());
+      }
+    }
+    return d;
+  }
+
+  public String getString(ConfVars c) {
+    return getString(c.name(), c.getVarName(), c.getStringValue());
+  }
+
+  public String getString(String envName, String propertyName, String 
defaultValue) {
+    if (System.getenv(envName) != null) {
+      return System.getenv(envName);
+    }
+    if (System.getProperty(propertyName) != null) {
+      return System.getProperty(propertyName);
+    }
+
+    return getStringValue(propertyName, defaultValue);
+  }
+
+  public int getInt(ConfVars c) {
+    return getInt(c.name(), c.getVarName(), c.getIntValue());
+  }
+
+  public int getInt(String envName, String propertyName, int defaultValue) {
+    if (System.getenv(envName) != null) {
+      return Integer.parseInt(System.getenv(envName));
+    }
+
+    if (System.getProperty(propertyName) != null) {
+      return Integer.parseInt(System.getProperty(propertyName));
+    }
+    return getIntValue(propertyName, defaultValue);
+  }
+
+  public long getLong(ConfVars c) {
+    return getLong(c.name(), c.getVarName(), c.getLongValue());
+  }
+
+  public long getLong(String envName, String propertyName, long defaultValue) {
+    if (System.getenv(envName) != null) {
+      return Long.parseLong(System.getenv(envName));
+    }
+
+    if (System.getProperty(propertyName) != null) {
+      return Long.parseLong(System.getProperty(propertyName));
+    }
+    return getLongValue(propertyName, defaultValue);
+  }
+
+  public float getFloat(ConfVars c) {
+    return getFloat(c.name(), c.getVarName(), c.getFloatValue());
+  }
+
+  public float getFloat(String envName, String propertyName, float 
defaultValue) {
+    if (System.getenv(envName) != null) {
+      return Float.parseFloat(System.getenv(envName));
+    }
+    if (System.getProperty(propertyName) != null) {
+      return Float.parseFloat(System.getProperty(propertyName));
+    }
+    return getFloatValue(propertyName, defaultValue);
+  }
+
+  public boolean getBoolean(ConfVars c) {
+    return getBoolean(c.name(), c.getVarName(), c.getBooleanValue());
+  }
+
+  public boolean getBoolean(String envName, String propertyName, boolean 
defaultValue) {
+    if (System.getenv(envName) != null) {
+      return Boolean.parseBoolean(System.getenv(envName));
+    }
+
+    if (System.getProperty(propertyName) != null) {
+      return Boolean.parseBoolean(System.getProperty(propertyName));
+    }
+    return getBooleanValue(propertyName, defaultValue);
+  }
+
+  public boolean useSsl() {
+    return getBoolean(ConfVars.ZEPPELIN_SSL);
+  }
+
+  public int getServerSslPort() {
+    return getInt(ConfVars.ZEPPELIN_SSL_PORT);
+  }
+
+  public boolean useClientAuth() {
+    return getBoolean(ConfVars.ZEPPELIN_SSL_CLIENT_AUTH);
+  }
+
+  public String getServerAddress() {
+    return getString(ConfVars.ZEPPELIN_ADDR);
+  }
+
+  public int getServerPort() {
+    return getInt(ConfVars.ZEPPELIN_PORT);
+  }
+
+  public String getServerContextPath() {
+    return getString(ConfVars.ZEPPELIN_SERVER_CONTEXT_PATH);
+  }
+
+  public String getKeyStorePath() {
+    String path = getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PATH);
+    if (path != null && path.startsWith("/") || isWindowsPath(path)) {
+      return path;
+    } else {
+      return getRelativeDir(
+          String.format("%s/%s",
+              getConfDir(),
+              path));
+    }
+  }
+
+  public String getKeyStoreType() {
+    return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_TYPE);
+  }
+
+  public String getKeyStorePassword() {
+    return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PASSWORD);
+  }
+
+  public String getKeyManagerPassword() {
+    String password = getString(ConfVars.ZEPPELIN_SSL_KEY_MANAGER_PASSWORD);
+    if (password == null) {
+      return getKeyStorePassword();
+    } else {
+      return password;
+    }
+  }
+
+  public String getTrustStorePath() {
+    String path = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PATH);
+    if (path == null) {
+      path = getKeyStorePath();
+    }
+    if (path != null && path.startsWith("/") || isWindowsPath(path)) {
+      return path;
+    } else {
+      return getRelativeDir(
+          String.format("%s/%s",
+              getConfDir(),
+              path));
+    }
+  }
+
+  public String getTrustStoreType() {
+    String type = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_TYPE);
+    if (type == null) {
+      return getKeyStoreType();
+    } else {
+      return type;
+    }
+  }
+
+  public String getTrustStorePassword() {
+    String password = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PASSWORD);
+    if (password == null) {
+      return getKeyStorePassword();
+    } else {
+      return password;
+    }
+  }
+
+  public String getNotebookDir() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
+  }
+
+  public String getUser() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER);
+  }
+
+  public String getBucketName() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_BUCKET);
+  }
+
+  public String getEndpoint() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_ENDPOINT);
+  }
+
+  public String getS3KMSKeyID() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID);
+  }
+
+  public String getS3KMSKeyRegion() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION);
+  }
+
+  public String getS3EncryptionMaterialsProviderClass() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_EMP);
+  }
+
+  public boolean isS3ServerSideEncryption() {
+    return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_S3_SSE);
+  }
+
+  public String getMongoUri() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI);
+  }
+
+  public String getMongoDatabase() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_DATABASE);
+  }
+
+  public String getMongoCollection() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_COLLECTION);
+  }
+
+  public boolean getMongoAutoimport() {
+    return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT);
+  }
+
+  public String getInterpreterListPath() {
+    return getRelativeDir(String.format("%s/interpreter-list", getConfDir()));
+  }
+
+  public String getInterpreterDir() {
+    return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR);
+  }
+
+  public String getInterpreterJson() {
+    return getString(ConfVars.ZEPPELIN_INTERPRETER_JSON);
+  }
+
+  public String getInterpreterSettingPath() {
+    return getRelativeDir(String.format("%s/interpreter.json", getConfDir()));
+  }
+
+  public String getHeliumConfPath() {
+    return getRelativeDir(String.format("%s/helium.json", getConfDir()));
+  }
+
+  public String getHeliumRegistry() {
+    return getRelativeDir(ConfVars.ZEPPELIN_HELIUM_REGISTRY);
+  }
+
+  public String getHeliumNodeInstallerUrl() {
+    return getString(ConfVars.ZEPPELIN_HELIUM_NODE_INSTALLER_URL);
+  }
+
+  public String getHeliumNpmInstallerUrl() {
+    return getString(ConfVars.ZEPPELIN_HELIUM_NPM_INSTALLER_URL);
+  }
+
+  public String getHeliumYarnInstallerUrl() {
+    return getString(ConfVars.ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL);
+  }
+
+  public String getNotebookAuthorizationPath() {
+    return getRelativeDir(String.format("%s/notebook-authorization.json", 
getConfDir()));
+  }
+
+  public Boolean credentialsPersist() {
+    return getBoolean(ConfVars.ZEPPELIN_CREDENTIALS_PERSIST);
+  }
+
+  public String getCredentialsPath() {
+    return getRelativeDir(String.format("%s/credentials.json", getConfDir()));
+  }
+
+  public String getShiroPath() {
+    String shiroPath = getRelativeDir(String.format("%s/shiro.ini", 
getConfDir()));
+    return new File(shiroPath).exists() ? shiroPath : StringUtils.EMPTY;
+  }
+
+  public String getInterpreterRemoteRunnerPath() {
+    return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
+  }
+
+  public String getInterpreterLocalRepoPath() {
+    return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO);
+  }
+
+  public String getInterpreterMvnRepoPath() {
+    return getString(ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO);
+  }
+
+  public String getRelativeDir(ConfVars c) {
+    return getRelativeDir(getString(c));
+  }
+
+  public String getRelativeDir(String path) {
+    if (path != null && path.startsWith("/") || isWindowsPath(path)) {
+      return path;
+    } else {
+      return getString(ConfVars.ZEPPELIN_HOME) + "/" + path;
+    }
+  }
+
+  public boolean isWindowsPath(String path){
+    return path.matches("^[A-Za-z]:\\\\.*");
+  }
+
+  public boolean isAnonymousAllowed() {
+    return getBoolean(ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED);
+  }
+
+  public boolean isNotebokPublic() {
+    return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC);
+  }
+
+  public String getConfDir() {
+    return getString(ConfVars.ZEPPELIN_CONF_DIR);
+  }
+
+  public List<String> getAllowedOrigins()
+  {
+    if (getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).isEmpty()) {
+      return Arrays.asList(new String[0]);
+    }
+
+    return 
Arrays.asList(getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).toLowerCase().split(","));
+  }
+
+  public String getWebsocketMaxTextMessageSize() {
+    return getString(ConfVars.ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE);
+  }
+
+  public String getJettyName() {
+    return getString(ConfVars.ZEPPELIN_SERVER_JETTY_NAME);
+  }
+
+
+  public String getXFrameOptions() {
+    return getString(ConfVars.ZEPPELIN_SERVER_XFRAME_OPTIONS);
+  }
+
+  public String getXxssProtection() {
+    return getString(ConfVars.ZEPPELIN_SERVER_X_XSS_PROTECTION);
+  }
+
+  public String getStrictTransport() {
+    return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT);
+  }
+
+
+  public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf,
+                                                ConfigurationKeyPredicate 
predicate) {
+    Map<String, String> configurations = new HashMap<>();
+
+    for (ZeppelinConfiguration.ConfVars v : 
ZeppelinConfiguration.ConfVars.values()) {
+      String key = v.getVarName();
+
+      if (!predicate.apply(key)) {
+        continue;
+      }
+
+      ConfVars.VarType type = v.getType();
+      Object value = null;
+      if (type == ConfVars.VarType.BOOLEAN) {
+        value = conf.getBoolean(v);
+      } else if (type == ConfVars.VarType.LONG) {
+        value = conf.getLong(v);
+      } else if (type == ConfVars.VarType.INT) {
+        value = conf.getInt(v);
+      } else if (type == ConfVars.VarType.FLOAT) {
+        value = conf.getFloat(v);
+      } else if (type == ConfVars.VarType.STRING) {
+        value = conf.getString(v);
+      }
+
+      if (value != null) {
+        configurations.put(key, value.toString());
+      }
+    }
+    return configurations;
+  }
+
+  /**
+   * Predication whether key/value pair should be included or not
+   */
+  public interface ConfigurationKeyPredicate {
+    boolean apply(String key);
+  }
+
+  /**
+   * Wrapper class.
+   */
+  public static enum ConfVars {
+    ZEPPELIN_HOME("zeppelin.home", "./"),
+    ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"),
+    ZEPPELIN_PORT("zeppelin.server.port", 8080),
+    ZEPPELIN_SERVER_CONTEXT_PATH("zeppelin.server.context.path", "/"),
+    ZEPPELIN_SSL("zeppelin.ssl", false),
+    ZEPPELIN_SSL_PORT("zeppelin.server.ssl.port", 8443),
+    ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false),
+    ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"),
+    ZEPPELIN_SSL_KEYSTORE_TYPE("zeppelin.ssl.keystore.type", "JKS"),
+    ZEPPELIN_SSL_KEYSTORE_PASSWORD("zeppelin.ssl.keystore.password", ""),
+    ZEPPELIN_SSL_KEY_MANAGER_PASSWORD("zeppelin.ssl.key.manager.password", 
null),
+    ZEPPELIN_SSL_TRUSTSTORE_PATH("zeppelin.ssl.truststore.path", null),
+    ZEPPELIN_SSL_TRUSTSTORE_TYPE("zeppelin.ssl.truststore.type", null),
+    ZEPPELIN_SSL_TRUSTSTORE_PASSWORD("zeppelin.ssl.truststore.password", null),
+    ZEPPELIN_WAR("zeppelin.war", "zeppelin-web/dist"),
+    ZEPPELIN_WAR_TEMPDIR("zeppelin.war.tempdir", "webapps"),
+    ZEPPELIN_INTERPRETERS("zeppelin.interpreters", 
"org.apache.zeppelin.spark.SparkInterpreter,"
+        + "org.apache.zeppelin.spark.PySparkInterpreter,"
+        + "org.apache.zeppelin.rinterpreter.RRepl,"
+        + "org.apache.zeppelin.rinterpreter.KnitR,"
+        + "org.apache.zeppelin.spark.SparkRInterpreter,"
+        + "org.apache.zeppelin.spark.SparkSqlInterpreter,"
+        + "org.apache.zeppelin.spark.DepInterpreter,"
+        + "org.apache.zeppelin.markdown.Markdown,"
+        + "org.apache.zeppelin.angular.AngularInterpreter,"
+        + "org.apache.zeppelin.shell.ShellInterpreter,"
+        + "org.apache.zeppelin.livy.LivySparkInterpreter,"
+        + "org.apache.zeppelin.livy.LivySparkSQLInterpreter,"
+        + "org.apache.zeppelin.livy.LivyPySparkInterpreter,"
+        + "org.apache.zeppelin.livy.LivyPySpark3Interpreter,"
+        + "org.apache.zeppelin.livy.LivySparkRInterpreter,"
+        + "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
+        + "org.apache.zeppelin.file.HDFSFileInterpreter,"
+        + "org.apache.zeppelin.pig.PigInterpreter,"
+        + "org.apache.zeppelin.pig.PigQueryInterpreter,"
+        + "org.apache.zeppelin.flink.FlinkInterpreter,"
+        + "org.apache.zeppelin.python.PythonInterpreter,"
+        + "org.apache.zeppelin.python.PythonInterpreterPandasSql,"
+        + "org.apache.zeppelin.python.PythonCondaInterpreter,"
+        + "org.apache.zeppelin.python.PythonDockerInterpreter,"
+        + "org.apache.zeppelin.ignite.IgniteInterpreter,"
+        + "org.apache.zeppelin.ignite.IgniteSqlInterpreter,"
+        + "org.apache.zeppelin.lens.LensInterpreter,"
+        + "org.apache.zeppelin.cassandra.CassandraInterpreter,"
+        + "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
+        + "org.apache.zeppelin.kylin.KylinInterpreter,"
+        + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
+        + "org.apache.zeppelin.scalding.ScaldingInterpreter,"
+        + "org.apache.zeppelin.jdbc.JDBCInterpreter,"
+        + "org.apache.zeppelin.hbase.HbaseInterpreter,"
+        + "org.apache.zeppelin.bigquery.BigQueryInterpreter,"
+        + "org.apache.zeppelin.beam.BeamInterpreter,"
+        + "org.apache.zeppelin.scio.ScioInterpreter,"
+        + "org.apache.zeppelin.groovy.GroovyInterpreter"
+        ),
+    ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", 
"interpreter-setting.json"),
+    ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
+    ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", 
"local-repo"),
+    ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo",
+        "http://repo1.maven.org/maven2/";),
+    
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 
30000),
+    ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 
10),
+    ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", 
"spark,md,angular,sh,"
+        + 
"livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
+        + "scalding,jdbc,hbase,bigquery,beam,pig,scio,groovy"),
+    ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 
1024 * 100),
+    ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
+    ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
+    // use specified notebook (id) as homescreen
+    ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
+    // whether homescreen notebook will be hidden from notebook list or not
+    ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE("zeppelin.notebook.homescreen.hide", 
false),
+    ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"),
+    ZEPPELIN_NOTEBOOK_S3_ENDPOINT("zeppelin.notebook.s3.endpoint", 
"s3.amazonaws.com"),
+    ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"),
+    
ZEPPELIN_NOTEBOOK_S3_EMP("zeppelin.notebook.s3.encryptionMaterialsProvider", 
null),
+    ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID("zeppelin.notebook.s3.kmsKeyID", null),
+    ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION("zeppelin.notebook.s3.kmsKeyRegion", 
null),
+    ZEPPELIN_NOTEBOOK_S3_SSE("zeppelin.notebook.s3.sse", false),
+    
ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString",
 null),
+    ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
+    ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
+    ZEPPELIN_NOTEBOOK_MONGO_DATABASE("zeppelin.notebook.mongo.database", 
"zeppelin"),
+    ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", 
"notes"),
+    ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", 
"mongodb://localhost"),
+    ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", 
false),
+    ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", 
GitNotebookRepo.class.getName()),
+    ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
+    // whether by default note is public or private
+    ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true),
+    ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner",
+        System.getProperty("os.name")
+                .startsWith("Windows") ? "bin/interpreter.cmd" : 
"bin/interpreter.sh"),
+    // Decide when new note is created, interpreter settings will be binded 
automatically or not.
+    
ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding",
 true),
+    ZEPPELIN_CONF_DIR("zeppelin.conf.dir", "conf"),
+    ZEPPELIN_DEP_LOCALREPO("zeppelin.dep.localrepo", "local-repo"),
+    ZEPPELIN_HELIUM_REGISTRY("zeppelin.helium.registry", "helium," + 
HELIUM_PACKAGE_DEFAULT_URL),
+    ZEPPELIN_HELIUM_NODE_INSTALLER_URL("zeppelin.helium.node.installer.url",
+            "https://nodejs.org/dist/";),
+    ZEPPELIN_HELIUM_NPM_INSTALLER_URL("zeppelin.helium.npm.installer.url",
+            "http://registry.npmjs.org/";),
+    
ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL("zeppelin.helium.yarnpkg.installer.url",
+            "https://github.com/yarnpkg/yarn/releases/download/";),
+    // Allows a way to specify a ',' separated list of allowed origins for 
rest and websockets
+    // i.e. http://localhost:8080
+    ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"),
+    ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true),
+    ZEPPELIN_CREDENTIALS_PERSIST("zeppelin.credentials.persist", true),
+    
ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size",
 "1024000"),
+    ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED("zeppelin.server.default.dir.allowed", 
false),
+    ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", 
"SAMEORIGIN"),
+    ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null),
+    ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", 
"max-age=631138519"),
+    ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
+
+    ZEPPELIN_HDFS_KEYTAB("zeppelin.hdfs.keytab", ""),
+    ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", "");
+
+    private String varName;
+    @SuppressWarnings("rawtypes")
+    private Class varClass;
+    private String stringValue;
+    private VarType type;
+    private int intValue;
+    private float floatValue;
+    private boolean booleanValue;
+    private long longValue;
+
+
+    ConfVars(String varName, String varValue) {
+      this.varName = varName;
+      this.varClass = String.class;
+      this.stringValue = varValue;
+      this.intValue = -1;
+      this.floatValue = -1;
+      this.longValue = -1;
+      this.booleanValue = false;
+      this.type = VarType.STRING;
+    }
+
+    ConfVars(String varName, int intValue) {
+      this.varName = varName;
+      this.varClass = Integer.class;
+      this.stringValue = null;
+      this.intValue = intValue;
+      this.floatValue = -1;
+      this.longValue = -1;
+      this.booleanValue = false;
+      this.type = VarType.INT;
+    }
+
+    ConfVars(String varName, long longValue) {
+      this.varName = varName;
+      this.varClass = Integer.class;
+      this.stringValue = null;
+      this.intValue = -1;
+      this.floatValue = -1;
+      this.longValue = longValue;
+      this.booleanValue = false;
+      this.type = VarType.LONG;
+    }
+
+    ConfVars(String varName, float floatValue) {
+      this.varName = varName;
+      this.varClass = Float.class;
+      this.stringValue = null;
+      this.intValue = -1;
+      this.longValue = -1;
+      this.floatValue = floatValue;
+      this.booleanValue = false;
+      this.type = VarType.FLOAT;
+    }
+
+    ConfVars(String varName, boolean booleanValue) {
+      this.varName = varName;
+      this.varClass = Boolean.class;
+      this.stringValue = null;
+      this.intValue = -1;
+      this.longValue = -1;
+      this.floatValue = -1;
+      this.booleanValue = booleanValue;
+      this.type = VarType.BOOLEAN;
+    }
+
+    public String getVarName() {
+      return varName;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public Class getVarClass() {
+      return varClass;
+    }
+
+    public int getIntValue() {
+      return intValue;
+    }
+
+    public long getLongValue() {
+      return longValue;
+    }
+
+    public float getFloatValue() {
+      return floatValue;
+    }
+
+    public String getStringValue() {
+      return stringValue;
+    }
+
+    public boolean getBooleanValue() {
+      return booleanValue;
+    }
+
+    public VarType getType() {
+      return type;
+    }
+
+    enum VarType {
+      STRING {
+        @Override
+        void checkType(String value) throws Exception {}
+      },
+      INT {
+        @Override
+        void checkType(String value) throws Exception {
+          Integer.valueOf(value);
+        }
+      },
+      LONG {
+        @Override
+        void checkType(String value) throws Exception {
+          Long.valueOf(value);
+        }
+      },
+      FLOAT {
+        @Override
+        void checkType(String value) throws Exception {
+          Float.valueOf(value);
+        }
+      },
+      BOOLEAN {
+        @Override
+        void checkType(String value) throws Exception {
+          Boolean.valueOf(value);
+        }
+      };
+
+      boolean isType(String value) {
+        try {
+          checkType(value);
+        } catch (Exception e) {
+          LOG.error("Exception in ZeppelinConfiguration while isType", e);
+          return false;
+        }
+        return true;
+      }
+
+      String typeString() {
+        return name().toUpperCase();
+      }
+
+      abstract void checkType(String value) throws Exception;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
index 5eecd6b..17a3529 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
@@ -19,12 +19,11 @@ package org.apache.zeppelin.helium;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.notebook.Paragraph;
-import org.apache.zeppelin.resource.*;
+import org.apache.zeppelin.resource.DistributedResourcePool;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
+import org.apache.zeppelin.resource.ResourceSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,22 +47,19 @@ public class Helium {
 
   private final HeliumBundleFactory bundleFactory;
   private final HeliumApplicationFactory applicationFactory;
-  private final InterpreterSettingManager interpreterSettingManager;
 
   public Helium(
       String heliumConfPath,
       String registryPaths,
       File registryCacheDir,
       HeliumBundleFactory bundleFactory,
-      HeliumApplicationFactory applicationFactory,
-      InterpreterSettingManager interpreterSettingManager)
+      HeliumApplicationFactory applicationFactory)
       throws IOException {
     this.heliumConfPath = heliumConfPath;
     this.registryPaths = registryPaths;
     this.registryCacheDir = registryCacheDir;
     this.bundleFactory = bundleFactory;
     this.applicationFactory = applicationFactory;
-    this.interpreterSettingManager = interpreterSettingManager;
     heliumConf = loadConf(heliumConfPath);
     allPackages = getAllPackageInfo();
   }
@@ -354,7 +350,7 @@ public class Helium {
         allResources = resourcePool.getAll();
       }
     } else {
-      allResources = interpreterSettingManager.getAllResources();
+      allResources = ResourcePoolUtils.getAllResources();
     }
 
     for (List<HeliumPackageSearchResult> pkgs : allPackages.values()) {
@@ -482,39 +478,4 @@ public class Helium {
 
     return mixed;
   }
-
-  public ResourceSet getAllResources() {
-    return getAllResourcesExcept(null);
-  }
-
-  private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) 
{
-    ResourceSet resourceSet = new ResourceSet();
-    for (InterpreterGroup intpGroup : 
interpreterSettingManager.getAllInterpreterGroup()) {
-      if (interpreterGroupExcludsion != null &&
-          intpGroup.getId().equals(interpreterGroupExcludsion)) {
-        continue;
-      }
-
-      RemoteInterpreterProcess remoteInterpreterProcess = 
intpGroup.getRemoteInterpreterProcess();
-      if (remoteInterpreterProcess == null) {
-        ResourcePool localPool = intpGroup.getResourcePool();
-        if (localPool != null) {
-          resourceSet.addAll(localPool.getAll());
-        }
-      } else if (remoteInterpreterProcess.isRunning()) {
-        List<String> resourceList = 
remoteInterpreterProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
-              @Override
-              public List<String> call(RemoteInterpreterService.Client client) 
throws Exception {
-                return client.resourcePoolGetAll();
-              }
-            }
-        );
-        for (String res : resourceList) {
-          resourceSet.add(Resource.fromJson(res));
-        }
-      }
-    }
-    return resourceSet;
-  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
index 5f5330c..84368a7 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
@@ -105,33 +105,38 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
     private void load(RemoteInterpreterProcess intpProcess, ApplicationState 
appState)
         throws Exception {
 
+      RemoteInterpreterService.Client client = null;
+
       synchronized (appState) {
         if (appState.getStatus() == ApplicationState.Status.LOADED) {
           // already loaded
           return;
         }
 
-        appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADING);
-        final String pkgInfo = pkg.toJson();
-        final String appId = appState.getId();
-
-        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
-            new 
RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
-              @Override
-              public RemoteApplicationResult 
call(RemoteInterpreterService.Client client)
-                  throws Exception {
-                return client.loadApplication(
-                    appId,
-                    pkgInfo,
-                    paragraph.getNote().getId(),
-                    paragraph.getId());
-              }
-            }
-        );
-        if (ret.isSuccess()) {
-          appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADED);
-        } else {
-          throw new ApplicationException(ret.getMsg());
+        try {
+          appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADING);
+          String pkgInfo = pkg.toJson();
+          String appId = appState.getId();
+
+          client = intpProcess.getClient();
+          RemoteApplicationResult ret = client.loadApplication(
+              appId,
+              pkgInfo,
+              paragraph.getNote().getId(),
+              paragraph.getId());
+
+          if (ret.isSuccess()) {
+            appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADED);
+          } else {
+            throw new ApplicationException(ret.getMsg());
+          }
+        } catch (TException e) {
+          intpProcess.releaseBrokenClient(client);
+          throw e;
+        } finally {
+          if (client != null) {
+            intpProcess.releaseClient(client);
+          }
         }
       }
     }
@@ -194,7 +199,7 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
       }
     }
 
-    private void unload(final ApplicationState appsToUnload) throws 
ApplicationException {
+    private void unload(ApplicationState appsToUnload) throws 
ApplicationException {
       synchronized (appsToUnload) {
         if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) {
           throw new ApplicationException(
@@ -212,19 +217,26 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
           throw new ApplicationException("Target interpreter process is not 
running");
         }
 
-        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
-            new 
RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
-              @Override
-              public RemoteApplicationResult 
call(RemoteInterpreterService.Client client)
-                  throws Exception {
-                return client.unloadApplication(appsToUnload.getId());
-              }
-            }
-        );
-        if (ret.isSuccess()) {
-          appStatusChange(paragraph, appsToUnload.getId(), 
ApplicationState.Status.UNLOADED);
-        } else {
-          throw new ApplicationException(ret.getMsg());
+        RemoteInterpreterService.Client client;
+        try {
+          client = intpProcess.getClient();
+        } catch (Exception e) {
+          throw new ApplicationException(e);
+        }
+
+        try {
+          RemoteApplicationResult ret = 
client.unloadApplication(appsToUnload.getId());
+
+          if (ret.isSuccess()) {
+            appStatusChange(paragraph, appsToUnload.getId(), 
ApplicationState.Status.UNLOADED);
+          } else {
+            throw new ApplicationException(ret.getMsg());
+          }
+        } catch (TException e) {
+          intpProcess.releaseBrokenClient(client);
+          throw new ApplicationException(e);
+        } finally {
+          intpProcess.releaseClient(client);
         }
       }
     }
@@ -274,7 +286,7 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
       }
     }
 
-    private void run(final ApplicationState app) throws ApplicationException {
+    private void run(ApplicationState app) throws ApplicationException {
       synchronized (app) {
         if (app.getStatus() != ApplicationState.Status.LOADED) {
           throw new ApplicationException(
@@ -291,19 +303,29 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
         if (intpProcess == null) {
           throw new ApplicationException("Target interpreter process is not 
running");
         }
-        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
-            new 
RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
-              @Override
-              public RemoteApplicationResult 
call(RemoteInterpreterService.Client client)
-                  throws Exception {
-                return client.runApplication(app.getId());
-              }
-            }
-        );
-        if (ret.isSuccess()) {
-          // success
-        } else {
-          throw new ApplicationException(ret.getMsg());
+        RemoteInterpreterService.Client client = null;
+        try {
+          client = intpProcess.getClient();
+        } catch (Exception e) {
+          throw new ApplicationException(e);
+        }
+
+        try {
+          RemoteApplicationResult ret = client.runApplication(app.getId());
+
+          if (ret.isSuccess()) {
+            // success
+          } else {
+            throw new ApplicationException(ret.getMsg());
+          }
+        } catch (TException e) {
+          intpProcess.releaseBrokenClient(client);
+          client = null;
+          throw new ApplicationException(e);
+        } finally {
+          if (client != null) {
+            intpProcess.releaseClient(client);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
new file mode 100644
index 0000000..9403b4f
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.NullArgumentException;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.RepositoryException;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Manage interpreters.
+ */
+public class InterpreterFactory implements InterpreterGroupFactory {
+  private static final Logger logger = 
LoggerFactory.getLogger(InterpreterFactory.class);
+
+  private Map<String, URLClassLoader> cleanCl =
+      Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
+
+  private ZeppelinConfiguration conf;
+
+  private final InterpreterSettingManager interpreterSettingManager;
+  private AngularObjectRegistryListener angularObjectRegistryListener;
+  private final RemoteInterpreterProcessListener 
remoteInterpreterProcessListener;
+  private final ApplicationEventListener appEventListener;
+
+  private boolean shiroEnabled;
+
+  private Map<String, String> env = new HashMap<>();
+
+  private Interpreter devInterpreter;
+
+  public InterpreterFactory(ZeppelinConfiguration conf,
+      AngularObjectRegistryListener angularObjectRegistryListener,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appEventListener, DependencyResolver 
depResolver,
+      boolean shiroEnabled, InterpreterSettingManager 
interpreterSettingManager)
+      throws InterpreterException, IOException, RepositoryException {
+    this.conf = conf;
+    this.angularObjectRegistryListener = angularObjectRegistryListener;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.appEventListener = appEventListener;
+    this.shiroEnabled = shiroEnabled;
+
+    this.interpreterSettingManager = interpreterSettingManager;
+    //TODO(jl): Fix it not to use InterpreterGroupFactory
+    interpreterSettingManager.setInterpreterGroupFactory(this);
+
+    logger.info("shiroEnabled: {}", shiroEnabled);
+  }
+
+  /**
+   * @param id interpreterGroup id. Combination of interpreterSettingId + 
noteId/userId/shared
+   * depends on interpreter mode
+   */
+  @Override
+  public InterpreterGroup createInterpreterGroup(String id, InterpreterOption 
option)
+      throws InterpreterException, NullArgumentException {
+
+    //When called from REST API without option we receive NPE
+    if (option == null) {
+      throw new NullArgumentException("option");
+    }
+
+    AngularObjectRegistry angularObjectRegistry;
+
+    InterpreterGroup interpreterGroup = new InterpreterGroup(id);
+    if (option.isRemote()) {
+      angularObjectRegistry =
+          new RemoteAngularObjectRegistry(id, angularObjectRegistryListener, 
interpreterGroup);
+    } else {
+      angularObjectRegistry = new AngularObjectRegistry(id, 
angularObjectRegistryListener);
+
+      // TODO(moon) : create distributed resource pool for local interpreters 
and set
+    }
+
+    interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+    return interpreterGroup;
+  }
+
+  public void createInterpretersForNote(InterpreterSetting interpreterSetting, 
String user,
+      String noteId, String interpreterSessionKey) {
+    InterpreterGroup interpreterGroup = 
interpreterSetting.getInterpreterGroup(user, noteId);
+    InterpreterOption option = interpreterSetting.getOption();
+    Properties properties = interpreterSetting.getFlatProperties();
+    // if interpreters are already there, wait until they're being removed
+    synchronized (interpreterGroup) {
+      long interpreterRemovalWaitStart = System.nanoTime();
+      // interpreter process supposed to be terminated by 
RemoteInterpreterProcess.dereference()
+      // in ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT msec. However, if termination 
of the process and
+      // removal from interpreter group take too long, throw an error.
+      long minTimeout = 10L * 1000 * 1000000; // 10 sec
+      long interpreterRemovalWaitTimeout = Math.max(minTimeout,
+          conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 
1000000L * 2);
+      while (interpreterGroup.containsKey(interpreterSessionKey)) {
+        if (System.nanoTime() - interpreterRemovalWaitStart > 
interpreterRemovalWaitTimeout) {
+          throw new InterpreterException("Can not create interpreter");
+        }
+        try {
+          interpreterGroup.wait(1000);
+        } catch (InterruptedException e) {
+          logger.debug(e.getMessage(), e);
+        }
+      }
+    }
+
+    logger.info("Create interpreter instance {} for note {}", 
interpreterSetting.getName(), noteId);
+
+    List<InterpreterInfo> interpreterInfos = 
interpreterSetting.getInterpreterInfos();
+    String path = interpreterSetting.getPath();
+    InterpreterRunner runner = interpreterSetting.getInterpreterRunner();
+    Interpreter interpreter;
+    for (InterpreterInfo info : interpreterInfos) {
+      if (option.isRemote()) {
+        if (option.isExistingProcess()) {
+          interpreter =
+              connectToRemoteRepl(interpreterSessionKey, info.getClassName(), 
option.getHost(),
+                  option.getPort(), properties, interpreterSetting.getId(), 
user,
+                  option.isUserImpersonate);
+        } else {
+          interpreter = createRemoteRepl(path, interpreterSessionKey, 
info.getClassName(),
+              properties, interpreterSetting.getId(), user, 
option.isUserImpersonate(), runner);
+        }
+      } else {
+        interpreter = createRepl(interpreterSetting.getPath(), 
info.getClassName(), properties);
+      }
+
+      synchronized (interpreterGroup) {
+        List<Interpreter> interpreters = 
interpreterGroup.get(interpreterSessionKey);
+        if (null == interpreters) {
+          interpreters = new ArrayList<>();
+          interpreterGroup.put(interpreterSessionKey, interpreters);
+        }
+        if (info.isDefaultInterpreter()) {
+          interpreters.add(0, interpreter);
+        } else {
+          interpreters.add(interpreter);
+        }
+      }
+      logger.info("Interpreter {} {} created", interpreter.getClassName(), 
interpreter.hashCode());
+      interpreter.setInterpreterGroup(interpreterGroup);
+    }
+  }
+
+  private Interpreter createRepl(String dirName, String className, Properties 
property)
+      throws InterpreterException {
+    logger.info("Create repl {} from {}", className, dirName);
+
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    try {
+
+      URLClassLoader ccl = cleanCl.get(dirName);
+      if (ccl == null) {
+        // classloader fallback
+        ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
+      }
+
+      boolean separateCL = true;
+      try { // check if server's classloader has driver already.
+        Class cls = this.getClass().forName(className);
+        if (cls != null) {
+          separateCL = false;
+        }
+      } catch (Exception e) {
+        logger.error("exception checking server classloader driver", e);
+      }
+
+      URLClassLoader cl;
+
+      if (separateCL == true) {
+        cl = URLClassLoader.newInstance(new URL[]{}, ccl);
+      } else {
+        cl = ccl;
+      }
+      Thread.currentThread().setContextClassLoader(cl);
+
+      Class<Interpreter> replClass = (Class<Interpreter>) 
cl.loadClass(className);
+      Constructor<Interpreter> constructor =
+          replClass.getConstructor(new Class[]{Properties.class});
+      Interpreter repl = constructor.newInstance(property);
+      repl.setClassloaderUrls(ccl.getURLs());
+      LazyOpenInterpreter intp = new LazyOpenInterpreter(new 
ClassloaderInterpreter(repl, cl));
+      return intp;
+    } catch (SecurityException e) {
+      throw new InterpreterException(e);
+    } catch (NoSuchMethodException e) {
+      throw new InterpreterException(e);
+    } catch (IllegalArgumentException e) {
+      throw new InterpreterException(e);
+    } catch (InstantiationException e) {
+      throw new InterpreterException(e);
+    } catch (IllegalAccessException e) {
+      throw new InterpreterException(e);
+    } catch (InvocationTargetException e) {
+      throw new InterpreterException(e);
+    } catch (ClassNotFoundException e) {
+      throw new InterpreterException(e);
+    } finally {
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  private Interpreter connectToRemoteRepl(String interpreterSessionKey, String 
className,
+      String host, int port, Properties property, String interpreterSettingId, 
String userName,
+      Boolean isUserImpersonate) {
+    int connectTimeout = 
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+    int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
+    String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + 
interpreterSettingId;
+    LazyOpenInterpreter intp = new LazyOpenInterpreter(
+        new RemoteInterpreter(property, interpreterSessionKey, className, 
host, port, localRepoPath,
+            connectTimeout, maxPoolSize, remoteInterpreterProcessListener, 
appEventListener,
+            userName, isUserImpersonate, 
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
+    return intp;
+  }
+
+  Interpreter createRemoteRepl(String interpreterPath, String 
interpreterSessionKey,
+      String className, Properties property, String interpreterSettingId,
+      String userName, Boolean isUserImpersonate, InterpreterRunner 
interpreterRunner) {
+    int connectTimeout = 
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+    String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + 
interpreterSettingId;
+    int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
+    String interpreterRunnerPath;
+    String interpreterGroupName = 
interpreterSettingManager.get(interpreterSettingId).getName();
+    if (null != interpreterRunner) {
+      interpreterRunnerPath = interpreterRunner.getPath();
+      Path p = Paths.get(interpreterRunnerPath);
+      if (!p.isAbsolute()) {
+        interpreterRunnerPath = Joiner.on(File.separator)
+            .join(interpreterPath, interpreterRunnerPath);
+      }
+    } else {
+      interpreterRunnerPath = conf.getInterpreterRemoteRunnerPath();
+    }
+
+    RemoteInterpreter remoteInterpreter =
+        new RemoteInterpreter(property, interpreterSessionKey, className,
+            interpreterRunnerPath, interpreterPath, localRepoPath, 
connectTimeout, maxPoolSize,
+            remoteInterpreterProcessListener, appEventListener, userName, 
isUserImpersonate,
+            conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT), 
interpreterGroupName);
+    remoteInterpreter.addEnv(env);
+
+    return new LazyOpenInterpreter(remoteInterpreter);
+  }
+
+  private List<Interpreter> createOrGetInterpreterList(String user, String 
noteId,
+      InterpreterSetting setting) {
+    InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, 
noteId);
+    synchronized (interpreterGroup) {
+      String interpreterSessionKey =
+          interpreterSettingManager.getInterpreterSessionKey(user, noteId, 
setting);
+      if (!interpreterGroup.containsKey(interpreterSessionKey)) {
+        createInterpretersForNote(setting, user, noteId, 
interpreterSessionKey);
+      }
+      return interpreterGroup.get(interpreterSessionKey);
+    }
+  }
+
+  private InterpreterSetting 
getInterpreterSettingByGroup(List<InterpreterSetting> settings,
+      String group) {
+    Preconditions.checkNotNull(group, "group should be not null");
+
+    for (InterpreterSetting setting : settings) {
+      if (group.equals(setting.getName())) {
+        return setting;
+      }
+    }
+    return null;
+  }
+
+  private String getInterpreterClassFromInterpreterSetting(InterpreterSetting 
setting,
+      String name) {
+    Preconditions.checkNotNull(name, "name should be not null");
+
+    for (InterpreterInfo info : setting.getInterpreterInfos()) {
+      String infoName = info.getName();
+      if (null != info.getName() && name.equals(infoName)) {
+        return info.getClassName();
+      }
+    }
+    return null;
+  }
+
+  private Interpreter getInterpreter(String user, String noteId, 
InterpreterSetting setting,
+      String name) {
+    Preconditions.checkNotNull(noteId, "noteId should be not null");
+    Preconditions.checkNotNull(setting, "setting should be not null");
+    Preconditions.checkNotNull(name, "name should be not null");
+
+    String className;
+    if (null != (className = 
getInterpreterClassFromInterpreterSetting(setting, name))) {
+      List<Interpreter> interpreterGroup = createOrGetInterpreterList(user, 
noteId, setting);
+      for (Interpreter interpreter : interpreterGroup) {
+        if (className.equals(interpreter.getClassName())) {
+          return interpreter;
+        }
+      }
+    }
+    return null;
+  }
+
+  public Interpreter getInterpreter(String user, String noteId, String 
replName) {
+    List<InterpreterSetting> settings = 
interpreterSettingManager.getInterpreterSettings(noteId);
+    InterpreterSetting setting;
+    Interpreter interpreter;
+
+    if (settings == null || settings.size() == 0) {
+      return null;
+    }
+
+    if (replName == null || replName.trim().length() == 0) {
+      // get default settings (first available)
+      // TODO(jl): Fix it in case of returning null
+      InterpreterSetting defaultSettings = interpreterSettingManager
+          .getDefaultInterpreterSetting(settings);
+      return createOrGetInterpreterList(user, noteId, defaultSettings).get(0);
+    }
+
+    String[] replNameSplit = replName.split("\\.");
+    if (replNameSplit.length == 2) {
+      String group = null;
+      String name = null;
+      group = replNameSplit[0];
+      name = replNameSplit[1];
+
+      setting = getInterpreterSettingByGroup(settings, group);
+
+      if (null != setting) {
+        interpreter = getInterpreter(user, noteId, setting, name);
+
+        if (null != interpreter) {
+          return interpreter;
+        }
+      }
+
+      throw new InterpreterException(replName + " interpreter not found");
+
+    } else {
+      // first assume replName is 'name' of interpreter. ('groupName' is 
ommitted)
+      // search 'name' from first (default) interpreter group
+      // TODO(jl): Handle with noteId to support defaultInterpreter per note.
+      setting = 
interpreterSettingManager.getDefaultInterpreterSetting(settings);
+
+      interpreter = getInterpreter(user, noteId, setting, replName);
+
+      if (null != interpreter) {
+        return interpreter;
+      }
+
+      // next, assume replName is 'group' of interpreter ('name' is ommitted)
+      // search interpreter group and return first interpreter.
+      setting = getInterpreterSettingByGroup(settings, replName);
+
+      if (null != setting) {
+        List<Interpreter> interpreters = createOrGetInterpreterList(user, 
noteId, setting);
+        if (null != interpreters) {
+          return interpreters.get(0);
+        }
+      }
+
+      // Support the legacy way to use it
+      for (InterpreterSetting s : settings) {
+        if (s.getGroup().equals(replName)) {
+          List<Interpreter> interpreters = createOrGetInterpreterList(user, 
noteId, s);
+          if (null != interpreters) {
+            return interpreters.get(0);
+          }
+        }
+      }
+    }
+
+    return null;
+  }
+
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  public void setEnv(Map<String, String> env) {
+    this.env = env;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java
new file mode 100644
index 0000000..3b9be40
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.lang.NullArgumentException;
+
+/**
+ * Created InterpreterGroup
+ */
+public interface InterpreterGroupFactory {
+  InterpreterGroup createInterpreterGroup(String interpreterGroupId, 
InterpreterOption option);
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfo.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfo.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfo.java
new file mode 100644
index 0000000..fd632ce
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Map;
+
+/**
+ * Information of interpreters in this interpreter setting.
+ * this will be serialized for conf/interpreter.json and REST api response.
+ */
+public class InterpreterInfo {
+  private String name;
+  @SerializedName("class") private String className;
+  private boolean defaultInterpreter = false;
+  private Map<String, Object> editor;
+
+  public InterpreterInfo(String className, String name, boolean 
defaultInterpreter,
+      Map<String, Object> editor) {
+    this.className = className;
+    this.name = name;
+    this.defaultInterpreter = defaultInterpreter;
+    this.editor = editor;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  boolean isDefaultInterpreter() {
+    return defaultInterpreter;
+  }
+
+  public Map<String, Object> getEditor() {
+    return editor;
+  }
+
+  public void setEditor(Map<String, Object> editor) {
+    this.editor = editor;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof InterpreterInfo)) {
+      return false;
+    }
+    InterpreterInfo other = (InterpreterInfo) obj;
+
+    boolean sameName =
+        null == getName() ? null == other.getName() : 
getName().equals(other.getName());
+    boolean sameClassName = null == getClassName() ?
+        null == other.getClassName() :
+        getClassName().equals(other.getClassName());
+    boolean sameIsDefaultInterpreter = defaultInterpreter == 
other.isDefaultInterpreter();
+
+    return sameName && sameClassName && sameIsDefaultInterpreter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
new file mode 100644
index 0000000..ca688dc
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.zeppelin.common.JsonSerializable;
+import org.sonatype.aether.repository.RemoteRepository;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class InterpreterInfoSaving implements JsonSerializable {
+
+  private static final Gson gson = new 
GsonBuilder().setPrettyPrinting().create();
+
+  public Map<String, InterpreterSetting> interpreterSettings;
+  public Map<String, List<String>> interpreterBindings;
+  public List<RemoteRepository> interpreterRepositories;
+
+  public String toJson() {
+    return gson.toJson(this);
+  }
+
+  public static InterpreterInfoSaving fromJson(String json) {
+    return gson.fromJson(json, InterpreterInfoSaving.class);
+  }
+}

Reply via email to