Repository: zeppelin
Updated Branches:
  refs/heads/master 1a3d1d185 -> 93a9aefc9


ZEPPELIN-3218. Plugins for Interpreter Launcher

### What is this PR for?
Move launcher into zeppelin plugins

### What type of PR is it?
[Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3218

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #3043 from zjffdu/ZEPPELIN-3218 and squashes the following commits:

45ee5844a [Jeff Zhang] ZEPPELIN-3218. Plugins for Interpreter Launcher


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/93a9aefc
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/93a9aefc
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/93a9aefc

Branch: refs/heads/master
Commit: 93a9aefc9d7cfcfd01a7b308d27695460ec847f4
Parents: 1a3d1d1
Author: Jeff Zhang <zjf...@apache.org>
Authored: Mon Jun 25 15:40:41 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Fri Jun 29 08:53:57 2018 +0800

----------------------------------------------------------------------
 zeppelin-plugins/launcher/spark/pom.xml         |  58 +++++
 .../launcher/SparkInterpreterLauncher.java      | 226 +++++++++++++++++++
 .../launcher/SparkInterpreterLauncherTest.java  | 192 ++++++++++++++++
 zeppelin-plugins/launcher/standard/pom.xml      |  50 ++++
 .../launcher/StandardInterpreterLauncher.java   | 100 ++++++++
 .../StandardInterpreterLauncherTest.java        |  86 +++++++
 zeppelin-plugins/pom.xml                        |   3 +
 .../zeppelin/rest/AbstractTestRestApi.java      |   2 +
 .../interpreter/InterpreterSetting.java         |  20 +-
 .../launcher/ShellScriptLauncher.java           | 100 --------
 .../launcher/SparkInterpreterLauncher.java      | 226 -------------------
 .../apache/zeppelin/plugin/PluginManager.java   |  86 +++++--
 .../launcher/ShellScriptLauncherTest.java       |  86 -------
 .../launcher/SparkInterpreterLauncherTest.java  | 192 ----------------
 14 files changed, 795 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/spark/pom.xml 
b/zeppelin-plugins/launcher/spark/pom.xml
new file mode 100644
index 0000000..88384eb
--- /dev/null
+++ b/zeppelin-plugins/launcher/spark/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>zengine-plugins-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../../../zeppelin-plugins</relativePath>
+    </parent>
+
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>launcher-spark</artifactId>
+    <packaging>jar</packaging>
+    <version>0.9.0-SNAPSHOT</version>
+    <name>Zeppelin: Plugin SparkInterpreterLauncher</name>
+    <description>Spark Launcher implementation based on shell script 
interpreter.sh</description>
+
+    <properties>
+        <plugin.name>Launcher/SparkInterpreterLauncher</plugin.name>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>launcher-standard</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
 
b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
new file mode 100644
index 0000000..ab95e0b
--- /dev/null
+++ 
b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Spark specific launcher.
+ */
+public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkInterpreterLauncher.class);
+
+  public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage 
recoveryStorage) {
+    super(zConf, recoveryStorage);
+  }
+
+  @Override
+  protected Map<String, String> 
buildEnvFromProperties(InterpreterLaunchContext context) {
+    Map<String, String> env = new HashMap<String, String>();
+    Properties sparkProperties = new Properties();
+    String sparkMaster = getSparkMaster(properties);
+    for (String key : properties.stringPropertyNames()) {
+      if (RemoteInterpreterUtils.isEnvString(key)) {
+        env.put(key, properties.getProperty(key));
+      }
+      if (isSparkConf(key, properties.getProperty(key))) {
+        sparkProperties.setProperty(key, 
toShellFormat(properties.getProperty(key)));
+      }
+    }
+
+    setupPropertiesForPySpark(sparkProperties);
+    setupPropertiesForSparkR(sparkProperties);
+    if (isYarnMode() && getDeployMode().equals("cluster")) {
+      env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
+      sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", 
"false");
+    }
+
+    StringBuilder sparkConfBuilder = new StringBuilder();
+    if (sparkMaster != null) {
+      sparkConfBuilder.append(" --master " + sparkMaster);
+    }
+    if (isYarnMode() && getDeployMode().equals("cluster")) {
+      sparkConfBuilder.append(" --files " + zConf.getConfDir() + 
"/log4j_yarn_cluster.properties");
+    }
+    for (String name : sparkProperties.stringPropertyNames()) {
+      sparkConfBuilder.append(" --conf " + name + "=" + 
sparkProperties.getProperty(name));
+    }
+    String useProxyUserEnv = 
System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER");
+    if (context.getOption().isUserImpersonate() && 
(StringUtils.isBlank(useProxyUserEnv) ||
+        !useProxyUserEnv.equals("false"))) {
+      sparkConfBuilder.append(" --proxy-user " + context.getUserName());
+    }
+
+    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+
+    // set these env in the order of
+    // 1. interpreter-setting
+    // 2. zeppelin-env.sh
+    // It is encouraged to set env in interpreter setting, but just for 
backward compatability,
+    // we also fallback to zeppelin-env.sh if it is not specified in 
interpreter setting.
+    for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", 
"HADOOP_CONF_DIR"})  {
+      String envValue = getEnv(envName);
+      if (envValue != null) {
+        env.put(envName, envValue);
+      }
+    }
+
+    String keytab = 
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
+    String principal =
+        
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
+
+    if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
+      env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab);
+      env.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", principal);
+      LOGGER.info("Run Spark under secure mode with keytab: " + keytab +
+          ", principal: " + principal);
+    } else {
+      LOGGER.info("Run Spark under non-secure mode as no keytab and principal 
is specified");
+    }
+    LOGGER.debug("buildEnvFromProperties: " + env);
+    return env;
+
+  }
+
+
+  /**
+   * get environmental variable in the following order
+   *
+   * 1. interpreter setting
+   * 2. zeppelin-env.sh
+   *
+   */
+  private String getEnv(String envName) {
+    String env = properties.getProperty(envName);
+    if (env == null) {
+      env = System.getenv(envName);
+    }
+    return env;
+  }
+
+  private boolean isSparkConf(String key, String value) {
+    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && 
!StringUtils.isEmpty(value);
+  }
+
+  private void setupPropertiesForPySpark(Properties sparkProperties) {
+    if (isYarnMode()) {
+      sparkProperties.setProperty("spark.yarn.isPython", "true");
+    }
+  }
+
+  private void mergeSparkProperty(Properties sparkProperties, String 
propertyName,
+                                  String propertyValue) {
+    if (sparkProperties.containsKey(propertyName)) {
+      String oldPropertyValue = sparkProperties.getProperty(propertyName);
+      sparkProperties.setProperty(propertyName, oldPropertyValue + "," + 
propertyValue);
+    } else {
+      sparkProperties.setProperty(propertyName, propertyValue);
+    }
+  }
+
+  private void setupPropertiesForSparkR(Properties sparkProperties) {
+    String sparkHome = getEnv("SPARK_HOME");
+    File sparkRBasePath = null;
+    if (sparkHome == null) {
+      if (!getSparkMaster(properties).startsWith("local")) {
+        throw new RuntimeException("SPARK_HOME is not specified in 
interpreter-setting" +
+            " for non-local mode, if you specify it in zeppelin-env.sh, please 
move that into " +
+            " interpreter setting");
+      }
+      String zeppelinHome = 
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
+      sparkRBasePath = new File(zeppelinHome,
+          "interpreter" + File.separator + "spark" + File.separator + "R");
+    } else {
+      sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
+    }
+
+    File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
+    if (sparkRPath.exists() && sparkRPath.isFile()) {
+      mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
+          sparkRPath.getAbsolutePath() + "#sparkr");
+    } else {
+      LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
+    }
+  }
+
+  /**
+   * Order to look for spark master
+   * 1. master in interpreter setting
+   * 2. spark.master interpreter setting
+   * 3. use local[*]
+   * @param properties
+   * @return
+   */
+  private String getSparkMaster(Properties properties) {
+    String master = properties.getProperty("master");
+    if (master == null) {
+      master = properties.getProperty("spark.master");
+      if (master == null) {
+        master = "local[*]";
+      }
+    }
+    return master;
+  }
+
+  private String getDeployMode() {
+    String master = getSparkMaster(properties);
+    if (master.equals("yarn-client")) {
+      return "client";
+    } else if (master.equals("yarn-cluster")) {
+      return "cluster";
+    } else if (master.startsWith("local")) {
+      return "client";
+    } else {
+      String deployMode = properties.getProperty("spark.submit.deployMode");
+      if (deployMode == null) {
+        throw new RuntimeException("master is set as yarn, but 
spark.submit.deployMode " +
+            "is not specified");
+      }
+      if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
+        throw new RuntimeException("Invalid value for spark.submit.deployMode: 
" + deployMode);
+      }
+      return deployMode;
+    }
+  }
+
+  private boolean isYarnMode() {
+    return getSparkMaster(properties).startsWith("yarn");
+  }
+
+  private String toShellFormat(String value) {
+    if (value.contains("'") && value.contains("\"")) {
+      throw new RuntimeException("Spark property value could not contain both 
\" and '");
+    } else if (value.contains("'")) {
+      return "\"" + value + "\"";
+    } else {
+      return "'" + value + "'";
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
 
b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
new file mode 100644
index 0000000..c2abd60
--- /dev/null
+++ 
b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkInterpreterLauncherTest {
+  @Before
+  public void setUp() {
+    for (final ZeppelinConfiguration.ConfVars confVar : 
ZeppelinConfiguration.ConfVars.values()) {
+      System.clearProperty(confVar.getVarName());
+    }
+  }
+
+  @Test
+  public void testConnectTimeOut() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    Properties properties = new Properties();
+    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty(
+        
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(),
 "10000");
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("name", interpreterProcess.getInterpreterSettingName());
+    assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
+    assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
+    assertEquals(10000, interpreterProcess.getConnectTimeout());
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 2);
+    assertEquals(true, interpreterProcess.isUserImpersonated());
+  }
+
+  @Test
+  public void testLocalMode() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    Properties properties = new Properties();
+    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("property_1", "value_1");
+    properties.setProperty("master", "local[*]");
+    properties.setProperty("spark.files", "file_1");
+    properties.setProperty("spark.jars", "jar_1");
+
+    InterpreterOption option = new InterpreterOption();
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 2);
+    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals(" --master local[*] --conf spark.files='file_1' --conf 
spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+  }
+
+  @Test
+  public void testYarnClientMode_1() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    Properties properties = new Properties();
+    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("property_1", "value_1");
+    properties.setProperty("master", "yarn-client");
+    properties.setProperty("spark.files", "file_1");
+    properties.setProperty("spark.jars", "jar_1");
+
+    InterpreterOption option = new InterpreterOption();
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 2);
+    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals(" --master yarn-client --conf spark.files='file_1' --conf 
spark.jars='jar_1' --conf spark.yarn.isPython=true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+  }
+
+  @Test
+  public void testYarnClientMode_2() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    Properties properties = new Properties();
+    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("property_1", "value_1");
+    properties.setProperty("master", "yarn");
+    properties.setProperty("spark.submit.deployMode", "client");
+    properties.setProperty("spark.files", "file_1");
+    properties.setProperty("spark.jars", "jar_1");
+
+    InterpreterOption option = new InterpreterOption();
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 2);
+    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals(" --master yarn --conf spark.files='file_1' --conf 
spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf 
spark.yarn.isPython=true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+  }
+
+  @Test
+  public void testYarnClusterMode_1() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    Properties properties = new Properties();
+    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("property_1", "value_1");
+    properties.setProperty("master", "yarn-cluster");
+    properties.setProperty("spark.files", "file_1");
+    properties.setProperty("spark.jars", "jar_1");
+
+    InterpreterOption option = new InterpreterOption();
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 3);
+    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
+    assertEquals(" --master yarn-cluster --files 
.//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf 
spark.jars='jar_1' --conf spark.yarn.isPython=true --conf 
spark.yarn.submit.waitAppCompletion=false", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+  }
+
+  @Test
+  public void testYarnClusterMode_2() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    Properties properties = new Properties();
+    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("property_1", "value_1");
+    properties.setProperty("master", "yarn");
+    properties.setProperty("spark.submit.deployMode", "cluster");
+    properties.setProperty("spark.files", "file_1");
+    properties.setProperty("spark.jars", "jar_1");
+
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 3);
+    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
+    assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties 
--conf spark.files='file_1' --conf spark.jars='jar_1' --conf 
spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf 
spark.yarn.submit.waitAppCompletion=false --proxy-user user1", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/standard/pom.xml 
b/zeppelin-plugins/launcher/standard/pom.xml
new file mode 100644
index 0000000..da961e9
--- /dev/null
+++ b/zeppelin-plugins/launcher/standard/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>zengine-plugins-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../../../zeppelin-plugins</relativePath>
+    </parent>
+
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>launcher-standard</artifactId>
+    <packaging>jar</packaging>
+    <version>0.9.0-SNAPSHOT</version>
+    <name>Zeppelin: Plugin StandardLauncher</name>
+    <description>Launcher implementation based on shell script 
interpreter.sh</description>
+
+    <properties>
+        <plugin.name>Launcher/StandardInterpreterLauncher</plugin.name>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
 
b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
new file mode 100644
index 0000000..10ab354
--- /dev/null
+++ 
b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Interpreter Launcher which use shell script to launch the interpreter 
process.
+ */
+public class StandardInterpreterLauncher extends InterpreterLauncher {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardInterpreterLauncher.class);
+
+  public StandardInterpreterLauncher(ZeppelinConfiguration zConf, 
RecoveryStorage recoveryStorage) {
+    super(zConf, recoveryStorage);
+  }
+
+  @Override
+  public InterpreterClient launch(InterpreterLaunchContext context) throws 
IOException {
+    LOGGER.info("Launching Interpreter: " + 
context.getInterpreterSettingGroup());
+    this.properties = context.getProperties();
+    InterpreterOption option = context.getOption();
+    InterpreterRunner runner = context.getRunner();
+    String groupName = context.getInterpreterSettingGroup();
+    String name = context.getInterpreterSettingName();
+    int connectTimeout = getConnectTimeout();
+
+    if (option.isExistingProcess()) {
+      return new RemoteInterpreterRunningProcess(
+          context.getInterpreterSettingName(),
+          connectTimeout,
+          option.getHost(),
+          option.getPort());
+    } else {
+      // try to recover it first
+      if (zConf.isRecoveryEnabled()) {
+        InterpreterClient recoveredClient =
+            
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
+        if (recoveredClient != null) {
+          if (recoveredClient.isRunning()) {
+            LOGGER.info("Recover interpreter process: " + 
recoveredClient.getHost() + ":" +
+                recoveredClient.getPort());
+            return recoveredClient;
+          } else {
+            LOGGER.warn("Cannot recover interpreter process: " + 
recoveredClient.getHost() + ":"
+                + recoveredClient.getPort() + ", as it is already 
terminated.");
+          }
+        }
+      }
+
+      // create new remote process
+      String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+          + context.getInterpreterSettingId();
+      return new RemoteInterpreterManagedProcess(
+          runner != null ? runner.getPath() : 
zConf.getInterpreterRemoteRunnerPath(),
+          context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), 
zConf.getInterpreterPortRange(),
+          zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
+          buildEnvFromProperties(context), connectTimeout, name,
+          context.getInterpreterGroupId(), option.isUserImpersonate());
+    }
+  }
+
+  protected Map<String, String> 
buildEnvFromProperties(InterpreterLaunchContext context) {
+    Map<String, String> env = new HashMap<>();
+    for (Object key : context.getProperties().keySet()) {
+      if (RemoteInterpreterUtils.isEnvString((String) key)) {
+        env.put((String) key, context.getProperties().getProperty((String) 
key));
+      }
+    }
+    return env;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
 
b/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
new file mode 100644
index 0000000..1861636
--- /dev/null
+++ 
b/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StandardInterpreterLauncherTest {
+  @Before
+  public void setUp() {
+    for (final ZeppelinConfiguration.ConfVars confVar : 
ZeppelinConfiguration.ConfVars.values()) {
+      System.clearProperty(confVar.getVarName());
+    }
+  }
+
+  @Test
+  public void testLauncher() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    StandardInterpreterLauncher launcher = new 
StandardInterpreterLauncher(zConf, null);
+    Properties properties = new Properties();
+    properties.setProperty("ENV_1", "VALUE_1");
+    properties.setProperty("property_1", "value_1");
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("name", interpreterProcess.getInterpreterSettingName());
+    assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
+    assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
+    
assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(),
+        interpreterProcess.getConnectTimeout());
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertEquals(1, interpreterProcess.getEnv().size());
+    assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
+    assertEquals(true, interpreterProcess.isUserImpersonated());
+  }
+
+  @Test
+  public void testConnectTimeOut() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    StandardInterpreterLauncher launcher = new 
StandardInterpreterLauncher(zConf, null);
+    Properties properties = new Properties();
+    properties.setProperty(
+        
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(),
 "10000");
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("name", interpreterProcess.getInterpreterSettingName());
+    assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
+    assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
+    assertEquals(10000, interpreterProcess.getConnectTimeout());
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertEquals(0, interpreterProcess.getEnv().size());
+    assertEquals(true, interpreterProcess.isUserImpersonated());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml
index abbb602..f67ac9d 100644
--- a/zeppelin-plugins/pom.xml
+++ b/zeppelin-plugins/pom.xml
@@ -46,6 +46,9 @@
         <module>notebookrepo/mongodb</module>
         <module>notebookrepo/zeppelin-hub</module>
         <module>notebookrepo/filesystem</module>
+
+        <module>launcher/standard</module>
+        <module>launcher/spark</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 172f117..80165ff 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -35,6 +35,7 @@ import org.apache.commons.httpclient.methods.PutMethod;
 import org.apache.commons.httpclient.methods.RequestEntity;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.plugin.PluginManager;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
@@ -292,6 +293,7 @@ public abstract class AbstractTestRestApi {
       LOG.info("Terminating test Zeppelin...");
       ZeppelinServer.jettyWebServer.stop();
       executor.shutdown();
+      PluginManager.reset();
 
       long s = System.currentTimeMillis();
       boolean started = true;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index b5596e2..3278e2f 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -26,7 +26,6 @@ import com.google.gson.JsonObject;
 import com.google.gson.annotations.SerializedName;
 import com.google.gson.internal.StringMap;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.dep.Dependency;
 import org.apache.zeppelin.dep.DependencyResolver;
@@ -35,8 +34,6 @@ import 
org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
 import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
-import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
-import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
 import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
 import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
@@ -44,6 +41,7 @@ import 
org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.plugin.PluginManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -293,12 +291,9 @@ public class InterpreterSetting {
     this.conf = o.getConf();
   }
 
-  private void createLauncher() {
-    if (group.equals("spark")) {
-      this.launcher = new SparkInterpreterLauncher(this.conf, 
this.recoveryStorage);
-    } else {
-      this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
-    }
+  private void createLauncher() throws IOException {
+    this.launcher = PluginManager.get().loadInterpreterLauncher(
+        getLauncherPlugin(), recoveryStorage);
   }
 
   public AngularObjectRegistryListener getAngularObjectRegistryListener() {
@@ -665,6 +660,13 @@ public class InterpreterSetting {
     runtimeInfosToBeCleared = null;
   }
 
+  public String getLauncherPlugin() {
+    if (group.equals("spark")) {
+      return "SparkInterpreterLauncher";
+    } else {
+      return "StandardInterpreterLauncher";
+    }
+  }
 
   //////////////////////////// IMPORTANT 
////////////////////////////////////////////////
   
///////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
deleted file mode 100644
index 18a6dde..0000000
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.InterpreterRunner;
-import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Interpreter Launcher which use shell script to launch the interpreter 
process.
- */
-public class ShellScriptLauncher extends InterpreterLauncher {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ShellScriptLauncher.class);
-
-  public ShellScriptLauncher(ZeppelinConfiguration zConf, RecoveryStorage 
recoveryStorage) {
-    super(zConf, recoveryStorage);
-  }
-
-  @Override
-  public InterpreterClient launch(InterpreterLaunchContext context) throws 
IOException {
-    LOGGER.info("Launching Interpreter: " + 
context.getInterpreterSettingGroup());
-    this.properties = context.getProperties();
-    InterpreterOption option = context.getOption();
-    InterpreterRunner runner = context.getRunner();
-    String groupName = context.getInterpreterSettingGroup();
-    String name = context.getInterpreterSettingName();
-    int connectTimeout = getConnectTimeout();
-
-    if (option.isExistingProcess()) {
-      return new RemoteInterpreterRunningProcess(
-          context.getInterpreterSettingName(),
-          connectTimeout,
-          option.getHost(),
-          option.getPort());
-    } else {
-      // try to recover it first
-      if (zConf.isRecoveryEnabled()) {
-        InterpreterClient recoveredClient =
-            
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
-        if (recoveredClient != null) {
-          if (recoveredClient.isRunning()) {
-            LOGGER.info("Recover interpreter process: " + 
recoveredClient.getHost() + ":" +
-                recoveredClient.getPort());
-            return recoveredClient;
-          } else {
-            LOGGER.warn("Cannot recover interpreter process: " + 
recoveredClient.getHost() + ":"
-                + recoveredClient.getPort() + ", as it is already 
terminated.");
-          }
-        }
-      }
-
-      // create new remote process
-      String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
-          + context.getInterpreterSettingId();
-      return new RemoteInterpreterManagedProcess(
-          runner != null ? runner.getPath() : 
zConf.getInterpreterRemoteRunnerPath(),
-          context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), 
zConf.getInterpreterPortRange(),
-          zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
-          buildEnvFromProperties(context), connectTimeout, name,
-          context.getInterpreterGroupId(), option.isUserImpersonate());
-    }
-  }
-
-  protected Map<String, String> 
buildEnvFromProperties(InterpreterLaunchContext context) {
-    Map<String, String> env = new HashMap<>();
-    for (Object key : context.getProperties().keySet()) {
-      if (RemoteInterpreterUtils.isEnvString((String) key)) {
-        env.put((String) key, context.getProperties().getProperty((String) 
key));
-      }
-    }
-    return env;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
deleted file mode 100644
index ff65e0d..0000000
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Spark specific launcher.
- */
-public class SparkInterpreterLauncher extends ShellScriptLauncher {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkInterpreterLauncher.class);
-
-  public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage 
recoveryStorage) {
-    super(zConf, recoveryStorage);
-  }
-
-  @Override
-  protected Map<String, String> 
buildEnvFromProperties(InterpreterLaunchContext context) {
-    Map<String, String> env = new HashMap<String, String>();
-    Properties sparkProperties = new Properties();
-    String sparkMaster = getSparkMaster(properties);
-    for (String key : properties.stringPropertyNames()) {
-      if (RemoteInterpreterUtils.isEnvString(key)) {
-        env.put(key, properties.getProperty(key));
-      }
-      if (isSparkConf(key, properties.getProperty(key))) {
-        sparkProperties.setProperty(key, 
toShellFormat(properties.getProperty(key)));
-      }
-    }
-
-    setupPropertiesForPySpark(sparkProperties);
-    setupPropertiesForSparkR(sparkProperties);
-    if (isYarnMode() && getDeployMode().equals("cluster")) {
-      env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
-      sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", 
"false");
-    }
-
-    StringBuilder sparkConfBuilder = new StringBuilder();
-    if (sparkMaster != null) {
-      sparkConfBuilder.append(" --master " + sparkMaster);
-    }
-    if (isYarnMode() && getDeployMode().equals("cluster")) {
-      sparkConfBuilder.append(" --files " + zConf.getConfDir() + 
"/log4j_yarn_cluster.properties");
-    }
-    for (String name : sparkProperties.stringPropertyNames()) {
-      sparkConfBuilder.append(" --conf " + name + "=" + 
sparkProperties.getProperty(name));
-    }
-    String useProxyUserEnv = 
System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER");
-    if (context.getOption().isUserImpersonate() && 
(StringUtils.isBlank(useProxyUserEnv) ||
-        !useProxyUserEnv.equals("false"))) {
-      sparkConfBuilder.append(" --proxy-user " + context.getUserName());
-    }
-
-    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
-
-    // set these env in the order of
-    // 1. interpreter-setting
-    // 2. zeppelin-env.sh
-    // It is encouraged to set env in interpreter setting, but just for 
backward compatability,
-    // we also fallback to zeppelin-env.sh if it is not specified in 
interpreter setting.
-    for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", 
"HADOOP_CONF_DIR"})  {
-      String envValue = getEnv(envName);
-      if (envValue != null) {
-        env.put(envName, envValue);
-      }
-    }
-
-    String keytab = 
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
-    String principal =
-        
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
-
-    if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
-      env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab);
-      env.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", principal);
-      LOGGER.info("Run Spark under secure mode with keytab: " + keytab +
-          ", principal: " + principal);
-    } else {
-      LOGGER.info("Run Spark under non-secure mode as no keytab and principal 
is specified");
-    }
-    LOGGER.debug("buildEnvFromProperties: " + env);
-    return env;
-
-  }
-
-
-  /**
-   * get environmental variable in the following order
-   *
-   * 1. interpreter setting
-   * 2. zeppelin-env.sh
-   *
-   */
-  private String getEnv(String envName) {
-    String env = properties.getProperty(envName);
-    if (env == null) {
-      env = System.getenv(envName);
-    }
-    return env;
-  }
-
-  private boolean isSparkConf(String key, String value) {
-    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && 
!StringUtils.isEmpty(value);
-  }
-
-  private void setupPropertiesForPySpark(Properties sparkProperties) {
-    if (isYarnMode()) {
-      sparkProperties.setProperty("spark.yarn.isPython", "true");
-    }
-  }
-
-  private void mergeSparkProperty(Properties sparkProperties, String 
propertyName,
-                                  String propertyValue) {
-    if (sparkProperties.containsKey(propertyName)) {
-      String oldPropertyValue = sparkProperties.getProperty(propertyName);
-      sparkProperties.setProperty(propertyName, oldPropertyValue + "," + 
propertyValue);
-    } else {
-      sparkProperties.setProperty(propertyName, propertyValue);
-    }
-  }
-
-  private void setupPropertiesForSparkR(Properties sparkProperties) {
-    String sparkHome = getEnv("SPARK_HOME");
-    File sparkRBasePath = null;
-    if (sparkHome == null) {
-      if (!getSparkMaster(properties).startsWith("local")) {
-        throw new RuntimeException("SPARK_HOME is not specified in 
interpreter-setting" +
-            " for non-local mode, if you specify it in zeppelin-env.sh, please 
move that into " +
-            " interpreter setting");
-      }
-      String zeppelinHome = 
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
-      sparkRBasePath = new File(zeppelinHome,
-          "interpreter" + File.separator + "spark" + File.separator + "R");
-    } else {
-      sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
-    }
-
-    File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
-    if (sparkRPath.exists() && sparkRPath.isFile()) {
-      mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
-          sparkRPath.getAbsolutePath() + "#sparkr");
-    } else {
-      LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
-    }
-  }
-
-  /**
-   * Order to look for spark master
-   * 1. master in interpreter setting
-   * 2. spark.master interpreter setting
-   * 3. use local[*]
-   * @param properties
-   * @return
-   */
-  private String getSparkMaster(Properties properties) {
-    String master = properties.getProperty("master");
-    if (master == null) {
-      master = properties.getProperty("spark.master");
-      if (master == null) {
-        master = "local[*]";
-      }
-    }
-    return master;
-  }
-
-  private String getDeployMode() {
-    String master = getSparkMaster(properties);
-    if (master.equals("yarn-client")) {
-      return "client";
-    } else if (master.equals("yarn-cluster")) {
-      return "cluster";
-    } else if (master.startsWith("local")) {
-      return "client";
-    } else {
-      String deployMode = properties.getProperty("spark.submit.deployMode");
-      if (deployMode == null) {
-        throw new RuntimeException("master is set as yarn, but 
spark.submit.deployMode " +
-            "is not specified");
-      }
-      if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
-        throw new RuntimeException("Invalid value for spark.submit.deployMode: 
" + deployMode);
-      }
-      return deployMode;
-    }
-  }
-
-  private boolean isYarnMode() {
-    return getSparkMaster(properties).startsWith("yarn");
-  }
-
-  private String toShellFormat(String value) {
-    if (value.contains("'") && value.contains("\"")) {
-      throw new RuntimeException("Spark property value could not contain both 
\" and '");
-    } else if (value.contains("'")) {
-      return "\"" + value + "\"";
-    } else {
-      return "'" + value + "'";
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java
index f573b15..5f7dc1d 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java
@@ -17,19 +17,24 @@
 
 package org.apache.zeppelin.plugin;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.List;
-import java.util.ServiceLoader;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Class for loading Plugins
@@ -42,6 +47,8 @@ public class PluginManager {
   private ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
   private String pluginsDir = zConf.getPluginsDir();
 
+  private Map<String, InterpreterLauncher> cachedLaunchers = new HashMap<>();
+
   public static synchronized PluginManager get() {
     if (instance == null) {
       instance = new PluginManager();
@@ -65,27 +72,13 @@ public class PluginManager {
     }
 
     String simpleClassName = 
notebookRepoClassName.substring(notebookRepoClassName.lastIndexOf(".") + 1);
-    File pluginFolder = new File(pluginsDir + "/NotebookRepo/" + 
simpleClassName);
-    if (!pluginFolder.exists() || pluginFolder.isFile()) {
-      LOGGER.warn("pluginFolder " + pluginFolder.getAbsolutePath() +
-          " doesn't exist or is not a directory");
-      return null;
-    }
-    List<URL> urls = new ArrayList<>();
-    for (File file : pluginFolder.listFiles()) {
-      LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of 
plugin "
-          + notebookRepoClassName);
-      urls.add(file.toURI().toURL());
-    }
-    if (urls.isEmpty()) {
-      LOGGER.warn("Can not load plugin " + notebookRepoClassName +
-          ", because the plugin folder " + pluginFolder + " is empty.");
+    URLClassLoader pluginClassLoader = getPluginClassLoader(pluginsDir, 
"NotebookRepo", simpleClassName);
+    if (pluginClassLoader == null) {
       return null;
     }
-    URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[0]));
     NotebookRepo notebookRepo = null;
     try {
-      notebookRepo = (NotebookRepo) (Class.forName(notebookRepoClassName, 
true, classLoader)).newInstance();
+      notebookRepo = (NotebookRepo) (Class.forName(notebookRepoClassName, 
true, pluginClassLoader)).newInstance();
     } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
       LOGGER.warn("Fail to instantiate notebookrepo from plugin classpath:" + 
notebookRepoClassName, e);
     }
@@ -96,4 +89,59 @@ public class PluginManager {
     return notebookRepo;
   }
 
+  public synchronized InterpreterLauncher loadInterpreterLauncher(String 
launcherPlugin,
+                                                     RecoveryStorage 
recoveryStorage)
+      throws IOException {
+
+    if (cachedLaunchers.containsKey(launcherPlugin)) {
+      return cachedLaunchers.get(launcherPlugin);
+    }
+    LOGGER.info("Loading Interpreter Launcher Plugin: " + launcherPlugin);
+    URLClassLoader pluginClassLoader = getPluginClassLoader(pluginsDir, 
"Launcher", launcherPlugin);
+    String pluginClass = "org.apache.zeppelin.interpreter.launcher." + 
launcherPlugin;
+    InterpreterLauncher launcher = null;
+    try {
+      launcher = (InterpreterLauncher) (Class.forName(pluginClass, true, 
pluginClassLoader))
+          .getConstructor(ZeppelinConfiguration.class, RecoveryStorage.class)
+          .newInstance(zConf, recoveryStorage);
+    } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException
+        | NoSuchMethodException | InvocationTargetException e) {
+      LOGGER.warn("Fail to instantiate Launcher from plugin classpath:" + 
launcherPlugin, e);
+    }
+
+    if (launcher == null) {
+      throw new IOException("Fail to load plugin: " + launcherPlugin);
+    }
+    cachedLaunchers.put(launcherPlugin, launcher);
+    return launcher;
+  }
+
+  private URLClassLoader getPluginClassLoader(String pluginsDir,
+                                              String pluginType,
+                                              String pluginName) throws 
IOException {
+
+    File pluginFolder = new File(pluginsDir + "/" + pluginType + "/" + 
pluginName);
+    if (!pluginFolder.exists() || pluginFolder.isFile()) {
+      LOGGER.warn("PluginFolder " + pluginFolder.getAbsolutePath() +
+          " doesn't exist or is not a directory");
+      return null;
+    }
+    List<URL> urls = new ArrayList<>();
+    for (File file : pluginFolder.listFiles()) {
+      LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of 
plugin: "
+          + pluginName);
+      urls.add(file.toURI().toURL());
+    }
+    if (urls.isEmpty()) {
+      LOGGER.warn("Can not load plugin " + pluginName +
+          ", because the plugin folder " + pluginFolder + " is empty.");
+      return null;
+    }
+    return new URLClassLoader(urls.toArray(new URL[0]));
+  }
+
+  @VisibleForTesting
+  public static void reset() {
+    instance = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
deleted file mode 100644
index ace3f31..0000000
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ShellScriptLauncherTest {
-  @Before
-  public void setUp() {
-    for (final ZeppelinConfiguration.ConfVars confVar : 
ZeppelinConfiguration.ConfVars.values()) {
-      System.clearProperty(confVar.getVarName());
-    }
-  }
-
-  @Test
-  public void testLauncher() throws IOException {
-    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null);
-    Properties properties = new Properties();
-    properties.setProperty("ENV_1", "VALUE_1");
-    properties.setProperty("property_1", "value_1");
-    InterpreterOption option = new InterpreterOption();
-    option.setUserImpersonate(true);
-    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
-    InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
-    assertEquals("name", interpreterProcess.getInterpreterSettingName());
-    assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
-    assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
-    
assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(),
-        interpreterProcess.getConnectTimeout());
-    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
-    assertEquals(1, interpreterProcess.getEnv().size());
-    assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
-    assertEquals(true, interpreterProcess.isUserImpersonated());
-  }
-
-  @Test
-  public void testConnectTimeOut() throws IOException {
-    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null);
-    Properties properties = new Properties();
-    properties.setProperty(
-        
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(),
 "10000");
-    InterpreterOption option = new InterpreterOption();
-    option.setUserImpersonate(true);
-    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
-    InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
-    assertEquals("name", interpreterProcess.getInterpreterSettingName());
-    assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
-    assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
-    assertEquals(10000, interpreterProcess.getConnectTimeout());
-    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
-    assertEquals(0, interpreterProcess.getEnv().size());
-    assertEquals(true, interpreterProcess.isUserImpersonated());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
deleted file mode 100644
index c2abd60..0000000
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class SparkInterpreterLauncherTest {
-  @Before
-  public void setUp() {
-    for (final ZeppelinConfiguration.ConfVars confVar : 
ZeppelinConfiguration.ConfVars.values()) {
-      System.clearProperty(confVar.getVarName());
-    }
-  }
-
-  @Test
-  public void testConnectTimeOut() throws IOException {
-    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
-    Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
-    properties.setProperty(
-        
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(),
 "10000");
-    InterpreterOption option = new InterpreterOption();
-    option.setUserImpersonate(true);
-    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
-    InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
-    assertEquals("name", interpreterProcess.getInterpreterSettingName());
-    assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
-    assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
-    assertEquals(10000, interpreterProcess.getConnectTimeout());
-    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
-    assertTrue(interpreterProcess.getEnv().size() >= 2);
-    assertEquals(true, interpreterProcess.isUserImpersonated());
-  }
-
-  @Test
-  public void testLocalMode() throws IOException {
-    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
-    Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
-    properties.setProperty("property_1", "value_1");
-    properties.setProperty("master", "local[*]");
-    properties.setProperty("spark.files", "file_1");
-    properties.setProperty("spark.jars", "jar_1");
-
-    InterpreterOption option = new InterpreterOption();
-    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
-    InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
-    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
-    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
-    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
-    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
-    assertTrue(interpreterProcess.getEnv().size() >= 2);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
-    assertEquals(" --master local[*] --conf spark.files='file_1' --conf 
spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
-  }
-
-  @Test
-  public void testYarnClientMode_1() throws IOException {
-    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
-    Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
-    properties.setProperty("property_1", "value_1");
-    properties.setProperty("master", "yarn-client");
-    properties.setProperty("spark.files", "file_1");
-    properties.setProperty("spark.jars", "jar_1");
-
-    InterpreterOption option = new InterpreterOption();
-    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
-    InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
-    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
-    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
-    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
-    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
-    assertTrue(interpreterProcess.getEnv().size() >= 2);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
-    assertEquals(" --master yarn-client --conf spark.files='file_1' --conf 
spark.jars='jar_1' --conf spark.yarn.isPython=true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
-  }
-
-  @Test
-  public void testYarnClientMode_2() throws IOException {
-    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
-    Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
-    properties.setProperty("property_1", "value_1");
-    properties.setProperty("master", "yarn");
-    properties.setProperty("spark.submit.deployMode", "client");
-    properties.setProperty("spark.files", "file_1");
-    properties.setProperty("spark.jars", "jar_1");
-
-    InterpreterOption option = new InterpreterOption();
-    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
-    InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
-    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
-    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
-    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
-    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
-    assertTrue(interpreterProcess.getEnv().size() >= 2);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
-    assertEquals(" --master yarn --conf spark.files='file_1' --conf 
spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf 
spark.yarn.isPython=true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
-  }
-
-  @Test
-  public void testYarnClusterMode_1() throws IOException {
-    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
-    Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
-    properties.setProperty("property_1", "value_1");
-    properties.setProperty("master", "yarn-cluster");
-    properties.setProperty("spark.files", "file_1");
-    properties.setProperty("spark.jars", "jar_1");
-
-    InterpreterOption option = new InterpreterOption();
-    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
-    InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
-    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
-    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
-    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
-    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
-    assertTrue(interpreterProcess.getEnv().size() >= 3);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
-    assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
-    assertEquals(" --master yarn-cluster --files 
.//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf 
spark.jars='jar_1' --conf spark.yarn.isPython=true --conf 
spark.yarn.submit.waitAppCompletion=false", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
-  }
-
-  @Test
-  public void testYarnClusterMode_2() throws IOException {
-    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
-    Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
-    properties.setProperty("property_1", "value_1");
-    properties.setProperty("master", "yarn");
-    properties.setProperty("spark.submit.deployMode", "cluster");
-    properties.setProperty("spark.files", "file_1");
-    properties.setProperty("spark.jars", "jar_1");
-
-    InterpreterOption option = new InterpreterOption();
-    option.setUserImpersonate(true);
-    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
-    InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
-    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
-    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
-    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
-    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
-    assertTrue(interpreterProcess.getEnv().size() >= 3);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
-    assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
-    assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties 
--conf spark.files='file_1' --conf spark.jars='jar_1' --conf 
spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf 
spark.yarn.submit.waitAppCompletion=false --proxy-user user1", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
-  }
-}

Reply via email to