Repository: zeppelin Updated Branches: refs/heads/master 1a3d1d185 -> 93a9aefc9
ZEPPELIN-3218. Plugins for Interpreter Launcher ### What is this PR for? Move launcher into zeppelin plugins ### What type of PR is it? [Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3218 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3043 from zjffdu/ZEPPELIN-3218 and squashes the following commits: 45ee5844a [Jeff Zhang] ZEPPELIN-3218. Plugins for Interpreter Launcher Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/93a9aefc Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/93a9aefc Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/93a9aefc Branch: refs/heads/master Commit: 93a9aefc9d7cfcfd01a7b308d27695460ec847f4 Parents: 1a3d1d1 Author: Jeff Zhang <zjf...@apache.org> Authored: Mon Jun 25 15:40:41 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Fri Jun 29 08:53:57 2018 +0800 ---------------------------------------------------------------------- zeppelin-plugins/launcher/spark/pom.xml | 58 +++++ .../launcher/SparkInterpreterLauncher.java | 226 +++++++++++++++++++ .../launcher/SparkInterpreterLauncherTest.java | 192 ++++++++++++++++ zeppelin-plugins/launcher/standard/pom.xml | 50 ++++ .../launcher/StandardInterpreterLauncher.java | 100 ++++++++ .../StandardInterpreterLauncherTest.java | 86 +++++++ zeppelin-plugins/pom.xml | 3 + .../zeppelin/rest/AbstractTestRestApi.java | 2 + .../interpreter/InterpreterSetting.java | 20 +- .../launcher/ShellScriptLauncher.java | 100 -------- .../launcher/SparkInterpreterLauncher.java | 226 ------------------- .../apache/zeppelin/plugin/PluginManager.java | 86 +++++-- .../launcher/ShellScriptLauncherTest.java | 86 ------- .../launcher/SparkInterpreterLauncherTest.java | 192 ---------------- 14 files changed, 795 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/spark/pom.xml b/zeppelin-plugins/launcher/spark/pom.xml new file mode 100644 index 0000000..88384eb --- /dev/null +++ b/zeppelin-plugins/launcher/spark/pom.xml @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zengine-plugins-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../../../zeppelin-plugins</relativePath> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>launcher-spark</artifactId> + <packaging>jar</packaging> + <version>0.9.0-SNAPSHOT</version> + <name>Zeppelin: Plugin SparkInterpreterLauncher</name> + <description>Spark Launcher implementation based on shell script interpreter.sh</description> + + <properties> + <plugin.name>Launcher/SparkInterpreterLauncher</plugin.name> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>launcher-standard</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java new file mode 100644 index 0000000..ab95e0b --- /dev/null +++ b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Spark specific launcher. + */ +public class SparkInterpreterLauncher extends StandardInterpreterLauncher { + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class); + + public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { + super(zConf, recoveryStorage); + } + + @Override + protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { + Map<String, String> env = new HashMap<String, String>(); + Properties sparkProperties = new Properties(); + String sparkMaster = getSparkMaster(properties); + for (String key : properties.stringPropertyNames()) { + if (RemoteInterpreterUtils.isEnvString(key)) { + env.put(key, properties.getProperty(key)); + } + if (isSparkConf(key, properties.getProperty(key))) { + sparkProperties.setProperty(key, toShellFormat(properties.getProperty(key))); + } + } + + setupPropertiesForPySpark(sparkProperties); + setupPropertiesForSparkR(sparkProperties); + if (isYarnMode() && getDeployMode().equals("cluster")) { + env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true"); + sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false"); + } + + StringBuilder sparkConfBuilder = new StringBuilder(); + if (sparkMaster != null) { + sparkConfBuilder.append(" --master " + sparkMaster); + } + if (isYarnMode() && getDeployMode().equals("cluster")) { + sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties"); + } + for (String name : sparkProperties.stringPropertyNames()) { + sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name)); + } + String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER"); + if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) || + !useProxyUserEnv.equals("false"))) { + sparkConfBuilder.append(" --proxy-user " + context.getUserName()); + } + + env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); + + // set these env in the order of + // 1. interpreter-setting + // 2. zeppelin-env.sh + // It is encouraged to set env in interpreter setting, but just for backward compatability, + // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting. + for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) { + String envValue = getEnv(envName); + if (envValue != null) { + env.put(envName, envValue); + } + } + + String keytab = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB); + String principal = + zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL); + + if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) { + env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab); + env.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", principal); + LOGGER.info("Run Spark under secure mode with keytab: " + keytab + + ", principal: " + principal); + } else { + LOGGER.info("Run Spark under non-secure mode as no keytab and principal is specified"); + } + LOGGER.debug("buildEnvFromProperties: " + env); + return env; + + } + + + /** + * get environmental variable in the following order + * + * 1. interpreter setting + * 2. zeppelin-env.sh + * + */ + private String getEnv(String envName) { + String env = properties.getProperty(envName); + if (env == null) { + env = System.getenv(envName); + } + return env; + } + + private boolean isSparkConf(String key, String value) { + return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); + } + + private void setupPropertiesForPySpark(Properties sparkProperties) { + if (isYarnMode()) { + sparkProperties.setProperty("spark.yarn.isPython", "true"); + } + } + + private void mergeSparkProperty(Properties sparkProperties, String propertyName, + String propertyValue) { + if (sparkProperties.containsKey(propertyName)) { + String oldPropertyValue = sparkProperties.getProperty(propertyName); + sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue); + } else { + sparkProperties.setProperty(propertyName, propertyValue); + } + } + + private void setupPropertiesForSparkR(Properties sparkProperties) { + String sparkHome = getEnv("SPARK_HOME"); + File sparkRBasePath = null; + if (sparkHome == null) { + if (!getSparkMaster(properties).startsWith("local")) { + throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" + + " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + + " interpreter setting"); + } + String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); + sparkRBasePath = new File(zeppelinHome, + "interpreter" + File.separator + "spark" + File.separator + "R"); + } else { + sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib"); + } + + File sparkRPath = new File(sparkRBasePath, "sparkr.zip"); + if (sparkRPath.exists() && sparkRPath.isFile()) { + mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", + sparkRPath.getAbsolutePath() + "#sparkr"); + } else { + LOGGER.warn("sparkr.zip is not found, SparkR may not work."); + } + } + + /** + * Order to look for spark master + * 1. master in interpreter setting + * 2. spark.master interpreter setting + * 3. use local[*] + * @param properties + * @return + */ + private String getSparkMaster(Properties properties) { + String master = properties.getProperty("master"); + if (master == null) { + master = properties.getProperty("spark.master"); + if (master == null) { + master = "local[*]"; + } + } + return master; + } + + private String getDeployMode() { + String master = getSparkMaster(properties); + if (master.equals("yarn-client")) { + return "client"; + } else if (master.equals("yarn-cluster")) { + return "cluster"; + } else if (master.startsWith("local")) { + return "client"; + } else { + String deployMode = properties.getProperty("spark.submit.deployMode"); + if (deployMode == null) { + throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " + + "is not specified"); + } + if (!deployMode.equals("client") && !deployMode.equals("cluster")) { + throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode); + } + return deployMode; + } + } + + private boolean isYarnMode() { + return getSparkMaster(properties).startsWith("yarn"); + } + + private String toShellFormat(String value) { + if (value.contains("'") && value.contains("\"")) { + throw new RuntimeException("Spark property value could not contain both \" and '"); + } else if (value.contains("'")) { + return "\"" + value + "\""; + } else { + return "'" + value + "'"; + } + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java new file mode 100644 index 0000000..c2abd60 --- /dev/null +++ b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SparkInterpreterLauncherTest { + @Before + public void setUp() { + for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) { + System.clearProperty(confVar.getVarName()); + } + } + + @Test + public void testConnectTimeOut() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("name", interpreterProcess.getInterpreterSettingName()); + assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); + assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); + assertEquals(10000, interpreterProcess.getConnectTimeout()); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 2); + assertEquals(true, interpreterProcess.isUserImpersonated()); + } + + @Test + public void testLocalMode() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("property_1", "value_1"); + properties.setProperty("master", "local[*]"); + properties.setProperty("spark.files", "file_1"); + properties.setProperty("spark.jars", "jar_1"); + + InterpreterOption option = new InterpreterOption(); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("spark", interpreterProcess.getInterpreterSettingName()); + assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); + assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 2); + assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals(" --master local[*] --conf spark.files='file_1' --conf spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + } + + @Test + public void testYarnClientMode_1() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("property_1", "value_1"); + properties.setProperty("master", "yarn-client"); + properties.setProperty("spark.files", "file_1"); + properties.setProperty("spark.jars", "jar_1"); + + InterpreterOption option = new InterpreterOption(); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("spark", interpreterProcess.getInterpreterSettingName()); + assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); + assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 2); + assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals(" --master yarn-client --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + } + + @Test + public void testYarnClientMode_2() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("property_1", "value_1"); + properties.setProperty("master", "yarn"); + properties.setProperty("spark.submit.deployMode", "client"); + properties.setProperty("spark.files", "file_1"); + properties.setProperty("spark.jars", "jar_1"); + + InterpreterOption option = new InterpreterOption(); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("spark", interpreterProcess.getInterpreterSettingName()); + assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); + assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 2); + assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals(" --master yarn --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + } + + @Test + public void testYarnClusterMode_1() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("property_1", "value_1"); + properties.setProperty("master", "yarn-cluster"); + properties.setProperty("spark.files", "file_1"); + properties.setProperty("spark.jars", "jar_1"); + + InterpreterOption option = new InterpreterOption(); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("spark", interpreterProcess.getInterpreterSettingName()); + assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); + assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 3); + assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); + assertEquals(" --master yarn-cluster --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + } + + @Test + public void testYarnClusterMode_2() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("property_1", "value_1"); + properties.setProperty("master", "yarn"); + properties.setProperty("spark.submit.deployMode", "cluster"); + properties.setProperty("spark.files", "file_1"); + properties.setProperty("spark.jars", "jar_1"); + + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("spark", interpreterProcess.getInterpreterSettingName()); + assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); + assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 3); + assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); + assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/standard/pom.xml b/zeppelin-plugins/launcher/standard/pom.xml new file mode 100644 index 0000000..da961e9 --- /dev/null +++ b/zeppelin-plugins/launcher/standard/pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zengine-plugins-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../../../zeppelin-plugins</relativePath> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>launcher-standard</artifactId> + <packaging>jar</packaging> + <version>0.9.0-SNAPSHOT</version> + <name>Zeppelin: Plugin StandardLauncher</name> + <description>Launcher implementation based on shell script interpreter.sh</description> + + <properties> + <plugin.name>Launcher/StandardInterpreterLauncher</plugin.name> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java new file mode 100644 index 0000000..10ab354 --- /dev/null +++ b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterRunner; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Interpreter Launcher which use shell script to launch the interpreter process. + */ +public class StandardInterpreterLauncher extends InterpreterLauncher { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardInterpreterLauncher.class); + + public StandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { + super(zConf, recoveryStorage); + } + + @Override + public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { + LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); + this.properties = context.getProperties(); + InterpreterOption option = context.getOption(); + InterpreterRunner runner = context.getRunner(); + String groupName = context.getInterpreterSettingGroup(); + String name = context.getInterpreterSettingName(); + int connectTimeout = getConnectTimeout(); + + if (option.isExistingProcess()) { + return new RemoteInterpreterRunningProcess( + context.getInterpreterSettingName(), + connectTimeout, + option.getHost(), + option.getPort()); + } else { + // try to recover it first + if (zConf.isRecoveryEnabled()) { + InterpreterClient recoveredClient = + recoveryStorage.getInterpreterClient(context.getInterpreterGroupId()); + if (recoveredClient != null) { + if (recoveredClient.isRunning()) { + LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" + + recoveredClient.getPort()); + return recoveredClient; + } else { + LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":" + + recoveredClient.getPort() + ", as it is already terminated."); + } + } + } + + // create new remote process + String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + + context.getInterpreterSettingId(); + return new RemoteInterpreterManagedProcess( + runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), + context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), zConf.getInterpreterPortRange(), + zConf.getInterpreterDir() + "/" + groupName, localRepoPath, + buildEnvFromProperties(context), connectTimeout, name, + context.getInterpreterGroupId(), option.isUserImpersonate()); + } + } + + protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { + Map<String, String> env = new HashMap<>(); + for (Object key : context.getProperties().keySet()) { + if (RemoteInterpreterUtils.isEnvString((String) key)) { + env.put((String) key, context.getProperties().getProperty((String) key)); + } + } + return env; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java new file mode 100644 index 0000000..1861636 --- /dev/null +++ b/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StandardInterpreterLauncherTest { + @Before + public void setUp() { + for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) { + System.clearProperty(confVar.getVarName()); + } + } + + @Test + public void testLauncher() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + StandardInterpreterLauncher launcher = new StandardInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("ENV_1", "VALUE_1"); + properties.setProperty("property_1", "value_1"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("name", interpreterProcess.getInterpreterSettingName()); + assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); + assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); + assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(), + interpreterProcess.getConnectTimeout()); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertEquals(1, interpreterProcess.getEnv().size()); + assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1")); + assertEquals(true, interpreterProcess.isUserImpersonated()); + } + + @Test + public void testConnectTimeOut() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + StandardInterpreterLauncher launcher = new StandardInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("name", interpreterProcess.getInterpreterSettingName()); + assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); + assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); + assertEquals(10000, interpreterProcess.getConnectTimeout()); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertEquals(0, interpreterProcess.getEnv().size()); + assertEquals(true, interpreterProcess.isUserImpersonated()); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml index abbb602..f67ac9d 100644 --- a/zeppelin-plugins/pom.xml +++ b/zeppelin-plugins/pom.xml @@ -46,6 +46,9 @@ <module>notebookrepo/mongodb</module> <module>notebookrepo/zeppelin-hub</module> <module>notebookrepo/filesystem</module> + + <module>launcher/standard</module> + <module>launcher/spark</module> </modules> <dependencies> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 172f117..80165ff 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -35,6 +35,7 @@ import org.apache.commons.httpclient.methods.PutMethod; import org.apache.commons.httpclient.methods.RequestEntity; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.plugin.PluginManager; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -292,6 +293,7 @@ public abstract class AbstractTestRestApi { LOG.info("Terminating test Zeppelin..."); ZeppelinServer.jettyWebServer.stop(); executor.shutdown(); + PluginManager.reset(); long s = System.currentTimeMillis(); boolean started = true; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index b5596e2..3278e2f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -26,7 +26,6 @@ import com.google.gson.JsonObject; import com.google.gson.annotations.SerializedName; import com.google.gson.internal.StringMap; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.Dependency; import org.apache.zeppelin.dep.DependencyResolver; @@ -35,8 +34,6 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext; import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher; -import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher; -import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher; import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager; import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; @@ -44,6 +41,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.plugin.PluginManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -293,12 +291,9 @@ public class InterpreterSetting { this.conf = o.getConf(); } - private void createLauncher() { - if (group.equals("spark")) { - this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage); - } else { - this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage); - } + private void createLauncher() throws IOException { + this.launcher = PluginManager.get().loadInterpreterLauncher( + getLauncherPlugin(), recoveryStorage); } public AngularObjectRegistryListener getAngularObjectRegistryListener() { @@ -665,6 +660,13 @@ public class InterpreterSetting { runtimeInfosToBeCleared = null; } + public String getLauncherPlugin() { + if (group.equals("spark")) { + return "SparkInterpreterLauncher"; + } else { + return "StandardInterpreterLauncher"; + } + } //////////////////////////// IMPORTANT //////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java deleted file mode 100644 index 18a6dde..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.zeppelin.interpreter.launcher; - -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.InterpreterRunner; -import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Interpreter Launcher which use shell script to launch the interpreter process. - */ -public class ShellScriptLauncher extends InterpreterLauncher { - - private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class); - - public ShellScriptLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { - super(zConf, recoveryStorage); - } - - @Override - public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { - LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); - this.properties = context.getProperties(); - InterpreterOption option = context.getOption(); - InterpreterRunner runner = context.getRunner(); - String groupName = context.getInterpreterSettingGroup(); - String name = context.getInterpreterSettingName(); - int connectTimeout = getConnectTimeout(); - - if (option.isExistingProcess()) { - return new RemoteInterpreterRunningProcess( - context.getInterpreterSettingName(), - connectTimeout, - option.getHost(), - option.getPort()); - } else { - // try to recover it first - if (zConf.isRecoveryEnabled()) { - InterpreterClient recoveredClient = - recoveryStorage.getInterpreterClient(context.getInterpreterGroupId()); - if (recoveredClient != null) { - if (recoveredClient.isRunning()) { - LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" + - recoveredClient.getPort()); - return recoveredClient; - } else { - LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":" - + recoveredClient.getPort() + ", as it is already terminated."); - } - } - } - - // create new remote process - String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" - + context.getInterpreterSettingId(); - return new RemoteInterpreterManagedProcess( - runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), - context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), zConf.getInterpreterPortRange(), - zConf.getInterpreterDir() + "/" + groupName, localRepoPath, - buildEnvFromProperties(context), connectTimeout, name, - context.getInterpreterGroupId(), option.isUserImpersonate()); - } - } - - protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { - Map<String, String> env = new HashMap<>(); - for (Object key : context.getProperties().keySet()) { - if (RemoteInterpreterUtils.isEnvString((String) key)) { - env.put((String) key, context.getProperties().getProperty((String) key)); - } - } - return env; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java deleted file mode 100644 index ff65e0d..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.launcher; - -import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -/** - * Spark specific launcher. - */ -public class SparkInterpreterLauncher extends ShellScriptLauncher { - - private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class); - - public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { - super(zConf, recoveryStorage); - } - - @Override - protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { - Map<String, String> env = new HashMap<String, String>(); - Properties sparkProperties = new Properties(); - String sparkMaster = getSparkMaster(properties); - for (String key : properties.stringPropertyNames()) { - if (RemoteInterpreterUtils.isEnvString(key)) { - env.put(key, properties.getProperty(key)); - } - if (isSparkConf(key, properties.getProperty(key))) { - sparkProperties.setProperty(key, toShellFormat(properties.getProperty(key))); - } - } - - setupPropertiesForPySpark(sparkProperties); - setupPropertiesForSparkR(sparkProperties); - if (isYarnMode() && getDeployMode().equals("cluster")) { - env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true"); - sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false"); - } - - StringBuilder sparkConfBuilder = new StringBuilder(); - if (sparkMaster != null) { - sparkConfBuilder.append(" --master " + sparkMaster); - } - if (isYarnMode() && getDeployMode().equals("cluster")) { - sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties"); - } - for (String name : sparkProperties.stringPropertyNames()) { - sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name)); - } - String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER"); - if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) || - !useProxyUserEnv.equals("false"))) { - sparkConfBuilder.append(" --proxy-user " + context.getUserName()); - } - - env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); - - // set these env in the order of - // 1. interpreter-setting - // 2. zeppelin-env.sh - // It is encouraged to set env in interpreter setting, but just for backward compatability, - // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting. - for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) { - String envValue = getEnv(envName); - if (envValue != null) { - env.put(envName, envValue); - } - } - - String keytab = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB); - String principal = - zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL); - - if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) { - env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab); - env.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", principal); - LOGGER.info("Run Spark under secure mode with keytab: " + keytab + - ", principal: " + principal); - } else { - LOGGER.info("Run Spark under non-secure mode as no keytab and principal is specified"); - } - LOGGER.debug("buildEnvFromProperties: " + env); - return env; - - } - - - /** - * get environmental variable in the following order - * - * 1. interpreter setting - * 2. zeppelin-env.sh - * - */ - private String getEnv(String envName) { - String env = properties.getProperty(envName); - if (env == null) { - env = System.getenv(envName); - } - return env; - } - - private boolean isSparkConf(String key, String value) { - return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); - } - - private void setupPropertiesForPySpark(Properties sparkProperties) { - if (isYarnMode()) { - sparkProperties.setProperty("spark.yarn.isPython", "true"); - } - } - - private void mergeSparkProperty(Properties sparkProperties, String propertyName, - String propertyValue) { - if (sparkProperties.containsKey(propertyName)) { - String oldPropertyValue = sparkProperties.getProperty(propertyName); - sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue); - } else { - sparkProperties.setProperty(propertyName, propertyValue); - } - } - - private void setupPropertiesForSparkR(Properties sparkProperties) { - String sparkHome = getEnv("SPARK_HOME"); - File sparkRBasePath = null; - if (sparkHome == null) { - if (!getSparkMaster(properties).startsWith("local")) { - throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" + - " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + - " interpreter setting"); - } - String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); - sparkRBasePath = new File(zeppelinHome, - "interpreter" + File.separator + "spark" + File.separator + "R"); - } else { - sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib"); - } - - File sparkRPath = new File(sparkRBasePath, "sparkr.zip"); - if (sparkRPath.exists() && sparkRPath.isFile()) { - mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", - sparkRPath.getAbsolutePath() + "#sparkr"); - } else { - LOGGER.warn("sparkr.zip is not found, SparkR may not work."); - } - } - - /** - * Order to look for spark master - * 1. master in interpreter setting - * 2. spark.master interpreter setting - * 3. use local[*] - * @param properties - * @return - */ - private String getSparkMaster(Properties properties) { - String master = properties.getProperty("master"); - if (master == null) { - master = properties.getProperty("spark.master"); - if (master == null) { - master = "local[*]"; - } - } - return master; - } - - private String getDeployMode() { - String master = getSparkMaster(properties); - if (master.equals("yarn-client")) { - return "client"; - } else if (master.equals("yarn-cluster")) { - return "cluster"; - } else if (master.startsWith("local")) { - return "client"; - } else { - String deployMode = properties.getProperty("spark.submit.deployMode"); - if (deployMode == null) { - throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " + - "is not specified"); - } - if (!deployMode.equals("client") && !deployMode.equals("cluster")) { - throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode); - } - return deployMode; - } - } - - private boolean isYarnMode() { - return getSparkMaster(properties).startsWith("yarn"); - } - - private String toShellFormat(String value) { - if (value.contains("'") && value.contains("\"")) { - throw new RuntimeException("Spark property value could not contain both \" and '"); - } else if (value.contains("'")) { - return "\"" + value + "\""; - } else { - return "'" + value + "'"; - } - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java index f573b15..5f7dc1d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java @@ -17,19 +17,24 @@ package org.apache.zeppelin.plugin; +import com.google.common.annotations.VisibleForTesting; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; -import java.util.Iterator; +import java.util.HashMap; import java.util.List; -import java.util.ServiceLoader; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Class for loading Plugins @@ -42,6 +47,8 @@ public class PluginManager { private ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); private String pluginsDir = zConf.getPluginsDir(); + private Map<String, InterpreterLauncher> cachedLaunchers = new HashMap<>(); + public static synchronized PluginManager get() { if (instance == null) { instance = new PluginManager(); @@ -65,27 +72,13 @@ public class PluginManager { } String simpleClassName = notebookRepoClassName.substring(notebookRepoClassName.lastIndexOf(".") + 1); - File pluginFolder = new File(pluginsDir + "/NotebookRepo/" + simpleClassName); - if (!pluginFolder.exists() || pluginFolder.isFile()) { - LOGGER.warn("pluginFolder " + pluginFolder.getAbsolutePath() + - " doesn't exist or is not a directory"); - return null; - } - List<URL> urls = new ArrayList<>(); - for (File file : pluginFolder.listFiles()) { - LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of plugin " - + notebookRepoClassName); - urls.add(file.toURI().toURL()); - } - if (urls.isEmpty()) { - LOGGER.warn("Can not load plugin " + notebookRepoClassName + - ", because the plugin folder " + pluginFolder + " is empty."); + URLClassLoader pluginClassLoader = getPluginClassLoader(pluginsDir, "NotebookRepo", simpleClassName); + if (pluginClassLoader == null) { return null; } - URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[0])); NotebookRepo notebookRepo = null; try { - notebookRepo = (NotebookRepo) (Class.forName(notebookRepoClassName, true, classLoader)).newInstance(); + notebookRepo = (NotebookRepo) (Class.forName(notebookRepoClassName, true, pluginClassLoader)).newInstance(); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { LOGGER.warn("Fail to instantiate notebookrepo from plugin classpath:" + notebookRepoClassName, e); } @@ -96,4 +89,59 @@ public class PluginManager { return notebookRepo; } + public synchronized InterpreterLauncher loadInterpreterLauncher(String launcherPlugin, + RecoveryStorage recoveryStorage) + throws IOException { + + if (cachedLaunchers.containsKey(launcherPlugin)) { + return cachedLaunchers.get(launcherPlugin); + } + LOGGER.info("Loading Interpreter Launcher Plugin: " + launcherPlugin); + URLClassLoader pluginClassLoader = getPluginClassLoader(pluginsDir, "Launcher", launcherPlugin); + String pluginClass = "org.apache.zeppelin.interpreter.launcher." + launcherPlugin; + InterpreterLauncher launcher = null; + try { + launcher = (InterpreterLauncher) (Class.forName(pluginClass, true, pluginClassLoader)) + .getConstructor(ZeppelinConfiguration.class, RecoveryStorage.class) + .newInstance(zConf, recoveryStorage); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException + | NoSuchMethodException | InvocationTargetException e) { + LOGGER.warn("Fail to instantiate Launcher from plugin classpath:" + launcherPlugin, e); + } + + if (launcher == null) { + throw new IOException("Fail to load plugin: " + launcherPlugin); + } + cachedLaunchers.put(launcherPlugin, launcher); + return launcher; + } + + private URLClassLoader getPluginClassLoader(String pluginsDir, + String pluginType, + String pluginName) throws IOException { + + File pluginFolder = new File(pluginsDir + "/" + pluginType + "/" + pluginName); + if (!pluginFolder.exists() || pluginFolder.isFile()) { + LOGGER.warn("PluginFolder " + pluginFolder.getAbsolutePath() + + " doesn't exist or is not a directory"); + return null; + } + List<URL> urls = new ArrayList<>(); + for (File file : pluginFolder.listFiles()) { + LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of plugin: " + + pluginName); + urls.add(file.toURI().toURL()); + } + if (urls.isEmpty()) { + LOGGER.warn("Can not load plugin " + pluginName + + ", because the plugin folder " + pluginFolder + " is empty."); + return null; + } + return new URLClassLoader(urls.toArray(new URL[0])); + } + + @VisibleForTesting + public static void reset() { + instance = null; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java deleted file mode 100644 index ace3f31..0000000 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.launcher; - -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class ShellScriptLauncherTest { - @Before - public void setUp() { - for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) { - System.clearProperty(confVar.getVarName()); - } - } - - @Test - public void testLauncher() throws IOException { - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty("ENV_1", "VALUE_1"); - properties.setProperty("property_1", "value_1"); - InterpreterOption option = new InterpreterOption(); - option.setUserImpersonate(true); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; - assertEquals("name", interpreterProcess.getInterpreterSettingName()); - assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); - assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); - assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(), - interpreterProcess.getConnectTimeout()); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertEquals(1, interpreterProcess.getEnv().size()); - assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1")); - assertEquals(true, interpreterProcess.isUserImpersonated()); - } - - @Test - public void testConnectTimeOut() throws IOException { - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); - InterpreterOption option = new InterpreterOption(); - option.setUserImpersonate(true); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; - assertEquals("name", interpreterProcess.getInterpreterSettingName()); - assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); - assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); - assertEquals(10000, interpreterProcess.getConnectTimeout()); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertEquals(0, interpreterProcess.getEnv().size()); - assertEquals(true, interpreterProcess.isUserImpersonated()); - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java deleted file mode 100644 index c2abd60..0000000 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.launcher; - -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class SparkInterpreterLauncherTest { - @Before - public void setUp() { - for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) { - System.clearProperty(confVar.getVarName()); - } - } - - @Test - public void testConnectTimeOut() throws IOException { - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); - properties.setProperty( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); - InterpreterOption option = new InterpreterOption(); - option.setUserImpersonate(true); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; - assertEquals("name", interpreterProcess.getInterpreterSettingName()); - assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); - assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); - assertEquals(10000, interpreterProcess.getConnectTimeout()); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals(true, interpreterProcess.isUserImpersonated()); - } - - @Test - public void testLocalMode() throws IOException { - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); - properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "local[*]"); - properties.setProperty("spark.files", "file_1"); - properties.setProperty("spark.jars", "jar_1"); - - InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; - assertEquals("spark", interpreterProcess.getInterpreterSettingName()); - assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); - assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); - assertEquals(" --master local[*] --conf spark.files='file_1' --conf spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); - } - - @Test - public void testYarnClientMode_1() throws IOException { - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); - properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn-client"); - properties.setProperty("spark.files", "file_1"); - properties.setProperty("spark.jars", "jar_1"); - - InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; - assertEquals("spark", interpreterProcess.getInterpreterSettingName()); - assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); - assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); - assertEquals(" --master yarn-client --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); - } - - @Test - public void testYarnClientMode_2() throws IOException { - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); - properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn"); - properties.setProperty("spark.submit.deployMode", "client"); - properties.setProperty("spark.files", "file_1"); - properties.setProperty("spark.jars", "jar_1"); - - InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; - assertEquals("spark", interpreterProcess.getInterpreterSettingName()); - assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); - assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); - assertEquals(" --master yarn --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); - } - - @Test - public void testYarnClusterMode_1() throws IOException { - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); - properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn-cluster"); - properties.setProperty("spark.files", "file_1"); - properties.setProperty("spark.jars", "jar_1"); - - InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; - assertEquals("spark", interpreterProcess.getInterpreterSettingName()); - assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); - assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertTrue(interpreterProcess.getEnv().size() >= 3); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); - assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); - assertEquals(" --master yarn-cluster --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); - } - - @Test - public void testYarnClusterMode_2() throws IOException { - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); - properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn"); - properties.setProperty("spark.submit.deployMode", "cluster"); - properties.setProperty("spark.files", "file_1"); - properties.setProperty("spark.jars", "jar_1"); - - InterpreterOption option = new InterpreterOption(); - option.setUserImpersonate(true); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; - assertEquals("spark", interpreterProcess.getInterpreterSettingName()); - assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); - assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertTrue(interpreterProcess.getEnv().size() >= 3); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); - assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); - assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); - } -}