ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter ### What is this PR for? This is the first version for supporting yarn-cluster of `SparkInterpreter`. I just delegate all the function to `spark-submit` as yarn-cluster is natively supported by spark, we don't need to reinvent the wheel. But there's still improvement to be done in future, e.g. I put some spark specific logic in `InterpreterSetting` which is not a good practise. I plan to improve it when I refactor the `Interpreter` class (ZEPPELIN-2685).
Besides that, I also add `MiniHadoopCluster` & `MiniZeppelin` which help for the integration test of yarn-client & yarn-cluster mode, otherwise I have to manually verify yarn-client & yarn-cluster mode which would easily cause regression issue in future. To be noticed: * SPARK_HOME must be specified for yarn-cluster mode * HADOOP_CONF_DIR must be specified for yarn-cluster mode ### What type of PR is it? [Feature] ### Todos * [ ] - Task ### What is the Jira issue? https://github.com/zjffdu/zeppelin/tree/ZEPPELIN-2898 ### How should this be tested? System test is added in `SparkInterpreterIT`. ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2577 from zjffdu/ZEPPELIN-2898 and squashes the following commits: 9da7c4b [Jeff Zhang] ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5d715109 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5d715109 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5d715109 Branch: refs/heads/master Commit: 5d7151097e171c5ec9f22f150ac4ce92b5512013 Parents: d25639c Author: Jeff Zhang <zjf...@apache.org> Authored: Mon Sep 4 21:54:56 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Tue Sep 19 06:40:51 2017 +0800 ---------------------------------------------------------------------- .travis.yml | 4 +- bin/common.sh | 6 +- bin/interpreter.sh | 12 +- conf/log4j_yarn_cluster.properties | 23 ++ docs/interpreter/spark.md | 2 +- .../zeppelin/python/IPythonInterpreter.java | 44 +-- .../zeppelin/spark/IPySparkInterpreter.java | 11 +- .../zeppelin/spark/PySparkInterpreter.java | 22 +- .../org/apache/zeppelin/spark/PythonUtils.java | 96 +++++++ .../apache/zeppelin/spark/SparkInterpreter.java | 99 +------ .../src/main/resources/interpreter-setting.json | 4 +- .../zeppelin/spark/IPySparkInterpreterTest.java | 1 + .../spark/PySparkInterpreterMatplotlibTest.java | 2 +- .../zeppelin/spark/PySparkInterpreterTest.java | 2 +- .../interpreter/InterpreterContext.java | 47 ++- zeppelin-server/pom.xml | 12 + .../zeppelin/integration/AuthenticationIT.java | 2 +- .../integration/InterpreterModeActionsIT.java | 2 +- .../integration/PersonalizeActionsIT.java | 2 +- .../zeppelin/rest/AbstractTestRestApi.java | 8 +- zeppelin-zengine/pom.xml | 286 ++++++++++++++++++- .../zeppelin/conf/ZeppelinConfiguration.java | 2 +- .../interpreter/InterpreterSetting.java | 131 ++++++++- .../interpreter/InterpreterSettingManager.java | 15 +- .../remote/RemoteInterpreterEventPoller.java | 6 +- .../remote/RemoteInterpreterManagedProcess.java | 2 + .../remote/RemoteInterpreterProcess.java | 2 +- .../interpreter/AbstractInterpreterTest.java | 23 +- .../zeppelin/interpreter/MiniHadoopCluster.java | 114 ++++++++ .../zeppelin/interpreter/MiniZeppelin.java | 68 +++++ .../interpreter/SparkInterpreterModeTest.java | 147 ++++++++++ .../notebook/repo/VFSNotebookRepoTest.java | 4 +- 32 files changed, 1007 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 0d9e72a..4495aa4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -66,7 +66,8 @@ matrix: # Several tests were excluded from this configuration due to the following issues: # HeliumApplicationFactoryTest - https://issues.apache.org/jira/browse/ZEPPELIN-2470 # After issues are fixed these tests need to be included back by removing them from the "-Dtests.to.exclude" property - - jdk: "oraclejdk8" + - sudo: required + jdk: "oraclejdk8" dist: precise env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pweb-ci -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org.apache.zeppelin.spark.*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false" @@ -143,6 +144,7 @@ before_script: - if [[ -n $LIVY_VER ]]; then ./testing/downloadLivy.sh $LIVY_VER; fi - if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-$LIVY_VER-bin; fi - if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi + - export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER - echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh - echo "export ZEPPELIN_HELIUM_REGISTRY=helium" >> conf/zeppelin-env.sh - tail conf/zeppelin-env.sh http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/bin/common.sh ---------------------------------------------------------------------- diff --git a/bin/common.sh b/bin/common.sh index c7100c7..d425cb1 100644 --- a/bin/common.sh +++ b/bin/common.sh @@ -122,7 +122,11 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties" export JAVA_OPTS JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}" -JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties" +if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then + JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties" +else + JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties" +fi export JAVA_INTP_OPTS http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/bin/interpreter.sh ---------------------------------------------------------------------- diff --git a/bin/interpreter.sh b/bin/interpreter.sh index fd93a06..5245e25 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -143,7 +143,12 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}" fi unset PYSPARKPATH + export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}" + fi + if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then + ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}" + else # autodetect HADOOP_CONF_HOME by heuristic if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then @@ -152,13 +157,8 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then export HADOOP_CONF_DIR="/etc/hadoop/conf" fi fi - - if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then - ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}" - fi - - export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}" fi + elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then if [[ -n "${HBASE_CONF_DIR}" ]]; then ZEPPELIN_INTP_CLASSPATH+=":${HBASE_CONF_DIR}" http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/conf/log4j_yarn_cluster.properties ---------------------------------------------------------------------- diff --git a/conf/log4j_yarn_cluster.properties b/conf/log4j_yarn_cluster.properties new file mode 100644 index 0000000..532fc5e --- /dev/null +++ b/conf/log4j_yarn_cluster.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger = INFO, stdout + +log4j.appender.stdout = org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout = org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/docs/interpreter/spark.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md index 780c60a..be5b3e5 100644 --- a/docs/interpreter/spark.md +++ b/docs/interpreter/spark.md @@ -424,7 +424,7 @@ It creates separated SparkContext per each notebook in `isolated` mode. ## IPython support By default, zeppelin would use IPython in `pyspark` when IPython is available, Otherwise it would fall back to the original PySpark implementation. -If you don't want to use IPython, then you can set `zeppelin.spark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc +If you don't want to use IPython, then you can set `zeppelin.pyspark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc [Python Interpreter](python.html) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java index 9b6f730..193c343 100644 --- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java @@ -78,6 +78,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand private long ipythonLaunchTimeout; private String additionalPythonPath; private String additionalPythonInitFile; + private boolean useBuiltinPy4j = true; private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER); @@ -92,6 +93,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand * @param additionalPythonPath */ public void setAdditionalPythonPath(String additionalPythonPath) { + LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath); this.additionalPythonPath = additionalPythonPath; } @@ -105,6 +107,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand this.additionalPythonInitFile = additionalPythonInitFile; } + public void setAddBulitinPy4j(boolean add) { + this.useBuiltinPy4j = add; + } + @Override public void open() { try { @@ -113,6 +119,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand return; } pythonExecutable = getProperty().getProperty("zeppelin.python", "python"); + LOGGER.info("Python Exec: " + pythonExecutable); ipythonLaunchTimeout = Long.parseLong( getProperty().getProperty("zeppelin.ipython.launch.timeout", "30000")); this.zeppelinContext = new PythonZeppelinContext( @@ -218,29 +225,34 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); executor.setWatchdog(watchDog); - String py4jLibPath = null; - if (System.getenv("ZEPPELIN_HOME") != null) { - py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator - + PythonInterpreter.ZEPPELIN_PY4JPATH; - } else { - Path workingPath = Paths.get("..").toAbsolutePath(); - py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH; - } - if (additionalPythonPath != null) { - // put the py4j at the end, because additionalPythonPath may already contain py4j. - // e.g. PySparkInterpreter - additionalPythonPath = additionalPythonPath + ":" + py4jLibPath; - } else { - additionalPythonPath = py4jLibPath; + if (useBuiltinPy4j) { + String py4jLibPath = null; + if (System.getenv("ZEPPELIN_HOME") != null) { + py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + + PythonInterpreter.ZEPPELIN_PY4JPATH; + } else { + Path workingPath = Paths.get("..").toAbsolutePath(); + py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH; + } + if (additionalPythonPath != null) { + // put the py4j at the end, because additionalPythonPath may already contain py4j. + // e.g. PySparkInterpreter + additionalPythonPath = additionalPythonPath + ":" + py4jLibPath; + } else { + additionalPythonPath = py4jLibPath; + } } + Map<String, String> envs = EnvironmentUtils.getProcEnvironment(); if (envs.containsKey("PYTHONPATH")) { - envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH")); + if (additionalPythonPath != null) { + envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH")); + } } else { envs.put("PYTHONPATH", additionalPythonPath); } - LOGGER.debug("PYTHONPATH: " + envs.get("PYTHONPATH")); + LOGGER.info("PYTHONPATH: " + envs.get("PYTHONPATH")); executor.execute(cmd, envs, this); // wait until IPython kernel is started or timeout http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index f1b1435..56b3823 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -44,12 +44,15 @@ public class IPySparkInterpreter extends IPythonInterpreter { @Override public void open() { - getProperty().setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property)); + property.setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property)); sparkInterpreter = getSparkInterpreter(); SparkConf conf = sparkInterpreter.getSparkContext().getConf(); - String additionalPythonPath = conf.get("spark.submit.pyFiles").replaceAll(",", ":") + - ":../interpreter/lib/python"; - setAdditionalPythonPath(additionalPythonPath); + // only set PYTHONPATH in local or yarn-client mode. + // yarn-cluster will setup PYTHONPATH automatically. + if (!conf.get("spark.submit.deployMode").equals("cluster")) { + setAdditionalPythonPath(PythonUtils.sparkPythonPath()); + setAddBulitinPy4j(false); + } setAdditionalPythonInitFile("python/zeppelin_ipyspark.py"); super.open(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index e65df22..dd32059 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -115,7 +115,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand public void open() { // try IPySparkInterpreter first iPySparkInterpreter = getIPySparkInterpreter(); - if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true") && + if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true") && iPySparkInterpreter.checkIPythonPrerequisite()) { try { iPySparkInterpreter.open(); @@ -133,7 +133,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } iPySparkInterpreter = null; - if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true")) { + if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) { // don't print it when it is in testing, just for easy output check in test. try { InterpreterContext.get().out.write(("IPython is not available, " + @@ -202,13 +202,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } - private Map setupPySparkEnv() throws IOException{ + private Map setupPySparkEnv() throws IOException { Map env = EnvironmentUtils.getProcEnvironment(); - if (!env.containsKey("PYTHONPATH")) { - SparkConf conf = getSparkConf(); - env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") + - ":../interpreter/lib/python"); + // only set PYTHONPATH in local or yarn-client mode. + // yarn-cluster will setup PYTHONPATH automatically. + SparkConf conf = getSparkConf(); + if (!conf.get("spark.submit.deployMode", "client").equals("cluster")) { + if (!env.containsKey("PYTHONPATH")) { + env.put("PYTHONPATH", PythonUtils.sparkPythonPath()); + } else { + env.put("PYTHONPATH", PythonUtils.sparkPythonPath()); + } } // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT @@ -223,7 +228,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } - LOGGER.debug("PYTHONPATH: " + env.get("PYTHONPATH")); + LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH")); return env; } @@ -251,6 +256,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand gatewayServer.start(); String pythonExec = getPythonExec(property); + LOGGER.info("pythonExec: " + pythonExec); CommandLine cmd = CommandLine.parse(pythonExec); cmd.addArgument(scriptPath, false); cmd.addArgument(Integer.toString(port), false); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java new file mode 100644 index 0000000..8182690 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.ArrayList; +import java.util.List; + +/** + * Util class for PySpark + */ +public class PythonUtils { + + /** + * Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME + * when it is embedded mode. + * + * This method will called in zeppelin server process and spark driver process when it is + * local or yarn-client mode. + */ + public static String sparkPythonPath() { + List<String> pythonPath = new ArrayList<String>(); + String sparkHome = System.getenv("SPARK_HOME"); + String zeppelinHome = System.getenv("ZEPPELIN_HOME"); + if (zeppelinHome == null) { + zeppelinHome = new File("..").getAbsolutePath(); + } + if (sparkHome != null) { + // non-embedded mode when SPARK_HOME is specified. + File pyspark = new File(sparkHome, "python/lib/pyspark.zip"); + if (!pyspark.exists()) { + throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib"); + } + pythonPath.add(pyspark.getAbsolutePath()); + File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("py4j"); + } + }); + if (py4j.length == 0) { + throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib"); + } else if (py4j.length > 1) { + throw new RuntimeException("Multiple py4j files found under " + sparkHome + "/python/lib"); + } else { + pythonPath.add(py4j[0].getAbsolutePath()); + } + } else { + // embedded mode + File pyspark = new File(zeppelinHome, "interpreter/spark/pyspark/pyspark.zip"); + if (!pyspark.exists()) { + throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath()); + } + pythonPath.add(pyspark.getAbsolutePath()); + File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles( + new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("py4j"); + } + }); + if (py4j.length == 0) { + throw new RuntimeException("No py4j files found under " + zeppelinHome + + "/interpreter/spark/pyspark"); + } else if (py4j.length > 1) { + throw new RuntimeException("Multiple py4j files found under " + sparkHome + + "/interpreter/spark/pyspark"); + } else { + pythonPath.add(py4j[0].getAbsolutePath()); + } + } + + // add ${ZEPPELIN_HOME}/interpreter/lib/python for all the cases + pythonPath.add(zeppelinHome + "/interpreter/lib/python"); + return StringUtils.join(pythonPath, ":"); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index fd12a72..18da034 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -351,7 +351,11 @@ public class SparkInterpreter extends Interpreter { } public boolean isYarnMode() { - return getProperty("master").startsWith("yarn"); + String master = getProperty("master"); + if (master == null) { + master = getProperty().getProperty("spark.master", "local[*]"); + } + return master.startsWith("yarn"); } /** @@ -371,11 +375,6 @@ public class SparkInterpreter extends Interpreter { conf.set("spark.executor.uri", execUri); } conf.set("spark.scheduler.mode", "FAIR"); - conf.setMaster(getProperty("master")); - if (isYarnMode()) { - conf.set("master", "yarn"); - conf.set("spark.submit.deployMode", "client"); - } Properties intpProperty = getProperty(); for (Object k : intpProperty.keySet()) { @@ -394,8 +393,6 @@ public class SparkInterpreter extends Interpreter { } } - setupConfForPySpark(conf); - setupConfForSparkR(conf); Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession"); Object builder = Utils.invokeStaticMethod(SparkSession, "builder"); Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf }); @@ -529,96 +526,10 @@ public class SparkInterpreter extends Interpreter { } } } - setupConfForPySpark(conf); - setupConfForSparkR(conf); SparkContext sparkContext = new SparkContext(conf); return sparkContext; } - private void setupConfForPySpark(SparkConf conf) { - Object pysparkBaseProperty = - new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue(); - String pysparkBasePath = pysparkBaseProperty != null ? pysparkBaseProperty.toString() : null; - File pysparkPath; - if (null == pysparkBasePath) { - pysparkBasePath = - new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../") - .getValue().toString(); - pysparkPath = new File(pysparkBasePath, - "interpreter" + File.separator + "spark" + File.separator + "pyspark"); - } else { - pysparkPath = new File(pysparkBasePath, - "python" + File.separator + "lib"); - } - - //Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist - //TODO(zjffdu), this is not maintainable when new version is added. - String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip", - "py4j-0.10.1-src.zip", "py4j-0.10.3-src.zip", "py4j-0.10.4-src.zip"}; - ArrayList<String> pythonLibUris = new ArrayList<>(); - for (String lib : pythonLibs) { - File libFile = new File(pysparkPath, lib); - if (libFile.exists()) { - pythonLibUris.add(libFile.toURI().toString()); - } - } - pythonLibUris.trimToSize(); - - // Distribute two libraries(pyspark.zip and py4j-*.zip) to workers - // when spark version is less than or equal to 1.4.1 - if (pythonLibUris.size() == 2) { - try { - String confValue = conf.get("spark.yarn.dist.files"); - conf.set("spark.yarn.dist.files", confValue + "," + Joiner.on(",").join(pythonLibUris)); - } catch (NoSuchElementException e) { - conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris)); - } - if (!useSparkSubmit()) { - conf.set("spark.files", conf.get("spark.yarn.dist.files")); - } - conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs)); - conf.set("spark.submit.pyFiles", Joiner.on(",").join(pythonLibUris)); - } - - // Distributes needed libraries to workers - // when spark version is greater than or equal to 1.5.0 - if (isYarnMode()) { - conf.set("spark.yarn.isPython", "true"); - } - } - - private void setupConfForSparkR(SparkConf conf) { - Object sparkRBaseProperty = - new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue(); - String sparkRBasePath = sparkRBaseProperty != null ? sparkRBaseProperty.toString() : null; - File sparkRPath; - if (null == sparkRBasePath) { - sparkRBasePath = - new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../") - .getValue().toString(); - sparkRPath = new File(sparkRBasePath, - "interpreter" + File.separator + "spark" + File.separator + "R"); - } else { - sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib"); - } - - sparkRPath = new File(sparkRPath, "sparkr.zip"); - if (sparkRPath.exists() && sparkRPath.isFile()) { - String archives = null; - if (conf.contains("spark.yarn.dist.archives")) { - archives = conf.get("spark.yarn.dist.archives"); - } - if (archives != null) { - archives = archives + "," + sparkRPath + "#sparkr"; - } else { - archives = sparkRPath + "#sparkr"; - } - conf.set("spark.yarn.dist.archives", archives); - } else { - logger.warn("sparkr.zip is not found, sparkr may not work."); - } - } - static final String toString(Object o) { return (o instanceof String) ? (String) o : ""; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/interpreter-setting.json b/spark/src/main/resources/interpreter-setting.json index d646805..7c13c49 100644 --- a/spark/src/main/resources/interpreter-setting.json +++ b/spark/src/main/resources/interpreter-setting.json @@ -150,9 +150,9 @@ "description": "Python command to run pyspark with", "type": "string" }, - "zeppelin.spark.useIPython": { + "zeppelin.pyspark.useIPython": { "envName": null, - "propertyName": "zeppelin.spark.useIPython", + "propertyName": "zeppelin.pyspark.useIPython", "defaultValue": true, "description": "whether use IPython when it is available", "type": "checkbox" http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index 5a2e884..3f7cf75 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -59,6 +59,7 @@ public class IPySparkInterpreterTest { Properties p = new Properties(); p.setProperty("spark.master", "local[4]"); p.setProperty("master", "local[4]"); + p.setProperty("spark.submit.deployMode", "client"); p.setProperty("spark.app.name", "Zeppelin Test"); p.setProperty("zeppelin.spark.useHiveContext", "true"); p.setProperty("zeppelin.spark.maxResult", "1000"); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java index c6eb1d4..d695037 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java @@ -89,7 +89,7 @@ public class PySparkInterpreterMatplotlibTest { p.setProperty("zeppelin.spark.importImplicit", "true"); p.setProperty("zeppelin.pyspark.python", "python"); p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath()); - p.setProperty("zeppelin.spark.useIPython", "false"); + p.setProperty("zeppelin.pyspark.useIPython", "false"); return p; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index ffdb4e8..7a4abd6 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -59,7 +59,7 @@ public class PySparkInterpreterTest { p.setProperty("zeppelin.spark.importImplicit", "true"); p.setProperty("zeppelin.pyspark.python", "python"); p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath()); - p.setProperty("zeppelin.spark.useIPython", "false"); + p.setProperty("zeppelin.pyspark.useIPython", "false"); return p; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 4288ea3..f5fc70b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -17,6 +17,8 @@ package org.apache.zeppelin.interpreter; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,7 +36,7 @@ import org.apache.zeppelin.resource.ResourcePool; public class InterpreterContext { private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<>(); - public final InterpreterOutput out; + public InterpreterOutput out; public static InterpreterContext get() { return threadIC.get(); @@ -48,21 +50,46 @@ public class InterpreterContext { threadIC.remove(); } - private final String noteId; - private final String replName; - private final String paragraphTitle; - private final String paragraphId; - private final String paragraphText; + private String noteId; + private String replName; + private String paragraphTitle; + private String paragraphId; + private String paragraphText; private AuthenticationInfo authenticationInfo; - private final Map<String, Object> config; - private GUI gui; + private Map<String, Object> config = new HashMap<>(); + private GUI gui = new GUI(); private AngularObjectRegistry angularObjectRegistry; private ResourcePool resourcePool; - private List<InterpreterContextRunner> runners; + private List<InterpreterContextRunner> runners = new ArrayList<>(); private String className; private RemoteEventClientWrapper client; private RemoteWorksController remoteWorksController; - private final Map<String, Integer> progressMap; + private Map<String, Integer> progressMap; + + /** + * Builder class for InterpreterContext + */ + public static class Builder { + private InterpreterContext context = new InterpreterContext(); + + public Builder setNoteId(String noteId) { + context.noteId = noteId; + return this; + } + + public Builder setParagraphId(String paragraphId) { + context.paragraphId = paragraphId; + return this; + } + + public InterpreterContext getContext() { + return context; + } + } + + private InterpreterContext() { + + } // visible for testing public InterpreterContext(String noteId, http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 24d5ee7..e8db0c5 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -93,6 +93,18 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java index 7debf1b..3d1406a 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java @@ -83,7 +83,7 @@ public class AuthenticationIT extends AbstractZeppelinIT { } try { - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../"); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath()); ZeppelinConfiguration conf = ZeppelinConfiguration.create(); shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir())); File file = new File(shiroPath); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java index 7f8765f..999e796 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java @@ -82,7 +82,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT { return; } try { - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../"); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath()); ZeppelinConfiguration conf = ZeppelinConfiguration.create(); shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir())); interpreterOptionPath = conf.getRelativeDir(String.format("%s/interpreter.json", conf.getConfDir())); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java index b813ea9..dc07435 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java @@ -74,7 +74,7 @@ public class PersonalizeActionsIT extends AbstractZeppelinIT { return; } try { - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../"); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath()); ZeppelinConfiguration conf = ZeppelinConfiguration.create(); shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir())); File file = new File(shiroPath); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index e2f171f..7675cf6 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -126,8 +126,8 @@ public abstract class AbstractTestRestApi { private static void start(boolean withAuth) throws Exception { if (!wasRunning) { - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../"); - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), "../zeppelin-web/dist"); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), new File("../zeppelin-web/dist").getAbsolutePath()); // some test profile does not build zeppelin-web. // to prevent zeppelin starting up fail, create zeppelin-web/dist directory @@ -211,7 +211,7 @@ public abstract class AbstractTestRestApi { // set spark home for pyspark sparkProperties.put("spark.home", new InterpreterProperty("spark.home", getSparkHome(), InterpreterPropertyType.TEXTAREA.getValue())); - sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue())); + sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue())); sparkIntpSetting.setProperties(sparkProperties); pySpark = true; @@ -234,7 +234,7 @@ public abstract class AbstractTestRestApi { new InterpreterProperty("spark.home", sparkHome, InterpreterPropertyType.TEXTAREA.getValue())); sparkProperties.put("zeppelin.spark.useHiveContext", new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue())); - sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue())); + sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue())); pySpark = true; sparkR = true; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 314ca18..c67df6b 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -36,7 +36,7 @@ <properties> <!--library versions--> - <hadoop.version>2.6.0</hadoop.version> + <hadoop.version>2.7.3</hadoop.version> <commons.lang3.version>3.4</commons.lang3.version> <commons.vfs2.version>2.0</commons.vfs2.version> <aws.sdk.s3.version>1.10.62</aws.sdk.s3.version> @@ -214,12 +214,6 @@ </exclusions> </dependency> - <dependency> <!-- because there are two of them above --> - <groupId>xml-apis</groupId> - <artifactId>xml-apis</artifactId> - <version>${xml.apis.version}</version> - </dependency> - <dependency> <groupId>org.eclipse.jgit</groupId> <artifactId>org.eclipse.jgit</artifactId> @@ -299,21 +293,91 @@ </dependency> <dependency> - <groupId>org.mongodb</groupId> - <artifactId>mongo-java-driver</artifactId> - <version>3.4.1</version> - </dependency> - <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> <version>1.5</version> </dependency> <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>3.4.1</version> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>jackrabbit-webdav</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jgit</groupId> + <artifactId>org.eclipse.jgit</artifactId> + </exclusion> + <exclusion> + <groupId>com.jcraft</groupId> + <artifactId>jsch</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + <exclusion> + <groupId>xml-apis</groupId> + <artifactId>xml-apis</artifactId> + </exclusion> + <exclusion> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <classifier>tests</classifier> + <scope>test</scope> <exclusions> <exclusion> <groupId>com.sun.jersey</groupId> @@ -325,9 +389,90 @@ </exclusion> <exclusion> <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> <artifactId>jersey-server</artifactId> </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>jackrabbit-webdav</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jgit</groupId> + <artifactId>org.eclipse.jgit</artifactId> + </exclusion> + <exclusion> + <groupId>com.jcraft</groupId> + <artifactId>jsch</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + <exclusion> + <groupId>xml-apis</groupId> + <artifactId>xml-apis</artifactId> + </exclusion> + <exclusion> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <!--<exclusion>--> + <!--<groupId>com.sun.jersey</groupId>--> + <!--<artifactId>jersey-core</artifactId>--> + <!--</exclusion>--> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <!--<exclusion>--> + <!--<groupId>com.sun.jersey</groupId>--> + <!--<artifactId>jersey-server</artifactId>--> + <!--</exclusion>--> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> @@ -349,8 +494,74 @@ <artifactId>commons-httpclient</artifactId> </exclusion> <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> + <groupId>org.eclipse.jgit</groupId> + <artifactId>org.eclipse.jgit</artifactId> + </exclusion> + <exclusion> + <groupId>com.jcraft</groupId> + <artifactId>jsch</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + <exclusion> + <groupId>xml-apis</groupId> + <artifactId>xml-apis</artifactId> + </exclusion> + <exclusion> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <version>${hadoop.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <!--<exclusion>--> + <!--<groupId>com.sun.jersey</groupId>--> + <!--<artifactId>jersey-json</artifactId>--> + <!--</exclusion>--> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>jackrabbit-webdav</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jgit</groupId> @@ -373,9 +584,50 @@ <artifactId>xercesImpl</artifactId> </exclusion> <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-spark_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> + <exclusion> + <groupId>com.google.errorprone</groupId> + <artifactId>error_prone_annotations</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-context</artifactId> + </exclusion> </exclusions> </dependency> </dependencies> @@ -392,6 +644,12 @@ <artifactId>maven-surefire-plugin</artifactId> <configuration> <forkMode>always</forkMode> + <systemProperties> + <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir> + </systemProperties> + <environmentVariables> + <!--<ZEPPELIN_HOME>..</ZEPPELIN_HOME>--> + </environmentVariables> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index ba90ed8..2dec19c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -493,7 +493,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { } public String getConfDir() { - return getString(ConfVars.ZEPPELIN_CONF_DIR); + return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR); } public List<String> getAllowedOrigins() http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 79618a3..9a453d8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.interpreter; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.gson.JsonArray; @@ -26,6 +27,7 @@ import com.google.gson.JsonObject; import com.google.gson.annotations.SerializedName; import com.google.gson.internal.StringMap; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.Dependency; import org.apache.zeppelin.dep.DependencyResolver; @@ -44,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; +import java.io.FilenameFilter; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.URL; @@ -55,6 +58,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -452,7 +456,9 @@ public class InterpreterSetting { Properties jProperties = new Properties(); Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties; for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) { - jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString()); + if (entry.getValue().getValue() != null) { + jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString()); + } } if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) { @@ -707,22 +713,133 @@ public class InterpreterSetting { interpreterRunner != null ? interpreterRunner.getPath() : conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(), interpreterDir, localRepoPath, - getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout, + getEnvFromInterpreterProperty(), connectTimeout, remoteInterpreterProcessListener, appEventListener, group); } return remoteInterpreterProcess; } - private Map<String, String> getEnvFromInterpreterProperty(Properties property) { - Map<String, String> env = new HashMap<>(); - for (Object key : property.keySet()) { - if (RemoteInterpreterUtils.isEnvString((String) key)) { - env.put((String) key, property.getProperty((String) key)); + private boolean isSparkConf(String key, String value) { + return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); + } + + private Map<String, String> getEnvFromInterpreterProperty() { + Map<String, String> env = new HashMap<String, String>(); + Properties javaProperties = getJavaProperties(); + Properties sparkProperties = new Properties(); + String sparkMaster = getSparkMaster(); + for (String key : javaProperties.stringPropertyNames()) { + if (RemoteInterpreterUtils.isEnvString(key)) { + env.put(key, javaProperties.getProperty(key)); } + if (isSparkConf(key, javaProperties.getProperty(key))) { + sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key))); + } + } + + setupPropertiesForPySpark(sparkProperties); + setupPropertiesForSparkR(sparkProperties, javaProperties.getProperty("SPARK_HOME")); + if (isYarnMode() && getDeployMode().equals("cluster")) { + env.put("SPARK_YARN_CLUSTER", "true"); } + + StringBuilder sparkConfBuilder = new StringBuilder(); + if (sparkMaster != null) { + sparkConfBuilder.append(" --master " + sparkMaster); + } + if (isYarnMode() && getDeployMode().equals("cluster")) { + sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties"); + } + for (String name : sparkProperties.stringPropertyNames()) { + sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name)); + } + + env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); + LOGGER.debug("getEnvFromInterpreterProperty: " + env); return env; } + private void setupPropertiesForPySpark(Properties sparkProperties) { + if (isYarnMode()) { + sparkProperties.setProperty("spark.yarn.isPython", "true"); + } + } + + private void mergeSparkProperty(Properties sparkProperties, String propertyName, + String propertyValue) { + if (sparkProperties.containsKey(propertyName)) { + String oldPropertyValue = sparkProperties.getProperty(propertyName); + sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue); + } else { + sparkProperties.setProperty(propertyName, propertyValue); + } + } + + private void setupPropertiesForSparkR(Properties sparkProperties, + String sparkHome) { + File sparkRBasePath = null; + if (sparkHome == null) { + if (!getSparkMaster().startsWith("local")) { + throw new RuntimeException("SPARK_HOME is not specified for non-local mode"); + } + String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); + sparkRBasePath = new File(zeppelinHome, + "interpreter" + File.separator + "spark" + File.separator + "R"); + } else { + sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib"); + } + + File sparkRPath = new File(sparkRBasePath, "sparkr.zip"); + if (sparkRPath.exists() && sparkRPath.isFile()) { + mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath()); + } else { + LOGGER.warn("sparkr.zip is not found, SparkR may not work."); + } + } + + private String getSparkMaster() { + String master = getJavaProperties().getProperty("master"); + if (master == null) { + master = getJavaProperties().getProperty("spark.master", "local[*]"); + } + return master; + } + + private String getDeployMode() { + String master = getSparkMaster(); + if (master.equals("yarn-client")) { + return "client"; + } else if (master.equals("yarn-cluster")) { + return "cluster"; + } else if (master.startsWith("local")) { + return "client"; + } else { + String deployMode = getJavaProperties().getProperty("spark.submit.deployMode"); + if (deployMode == null) { + throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " + + "is not specified"); + } + if (!deployMode.equals("client") && !deployMode.equals("cluster")) { + throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode); + } + return deployMode; + } + } + + private boolean isYarnMode() { + return getSparkMaster().startsWith("yarn"); + } + + private String toShellFormat(String value) { + if (value.contains("\'") && value.contains("\"")) { + throw new RuntimeException("Spark property value could not contain both \" and '"); + } else if (value.contains("\'")) { + return "\"" + value + "\""; + } else { + return "\'" + value + "\'"; + } + } + private List<Interpreter> getOrCreateSession(String user, String noteId) { ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId); Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 585a58a..73babab 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -142,7 +142,7 @@ public class InterpreterSettingManager { this.interpreterDirPath = Paths.get(conf.getInterpreterDir()); LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath); this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath()); - LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath); + LOGGER.debug("InterpreterSettingPath: {}", interpreterSettingPath); this.dependencyResolver = new DependencyResolver( conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO)); this.interpreterRepositories = dependencyResolver.getRepos(); @@ -283,7 +283,7 @@ public class InterpreterSettingManager { private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir, String interpreterJson) throws IOException { URL[] urls = recursiveBuildLibList(new File(interpreterDir)); - ClassLoader tempClassLoader = new URLClassLoader(urls, cl); + ClassLoader tempClassLoader = new URLClassLoader(urls, null); URL url = tempClassLoader.getResource(interpreterJson); if (url == null) { @@ -392,6 +392,17 @@ public class InterpreterSettingManager { return settings; } + public InterpreterSetting getInterpreterSettingByName(String name) { + synchronized (interpreterSettings) { + for (InterpreterSetting setting : interpreterSettings.values()) { + if (setting.getName().equals(name)) { + return setting; + } + } + } + throw new RuntimeException("No such interpreter setting: " + name); + } + public ManagedInterpreterGroup getInterpreterGroupById(String groupId) { for (InterpreterSetting setting : interpreterSettings.values()) { ManagedInterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index ca23bcf..35b6b6c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -254,7 +254,11 @@ public class RemoteInterpreterEventPoller extends Thread { try { clearUnreadEvents(interpreterProcess.getClient()); } catch (Exception e1) { - logger.error("Can't get RemoteInterpreterEvent", e1); + if (shutdown) { + logger.error("Can not get RemoteInterpreterEvent because it is shutdown."); + } else { + logger.error("Can't get RemoteInterpreterEvent", e1); + } } if (appendFuture != null) { appendFuture.cancel(true); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 2d64831..d21a962 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -226,6 +226,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } public void stop() { + // shutdown EventPoller first. + this.remoteInterpreterEventPoller.shutdown(); if (callbackServer.isServing()) { callbackServer.stop(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index d34c538..e45f15b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -32,7 +32,7 @@ public abstract class RemoteInterpreterProcess { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); private GenericObjectPool<Client> clientPool; - private final RemoteInterpreterEventPoller remoteInterpreterEventPoller; + protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller; private final InterpreterContextRunnerPool interpreterContextRunnerPool; private int connectTimeout; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java index 21d7526..9ab2137 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java @@ -25,14 +25,10 @@ import static org.mockito.Mockito.mock; */ public abstract class AbstractInterpreterTest { protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractInterpreterTest.class); - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; protected InterpreterSettingManager interpreterSettingManager; protected InterpreterFactory interpreterFactory; - protected File testRootDir; + protected File zeppelinHome; protected File interpreterDir; protected File confDir; protected File notebookDir; @@ -41,12 +37,11 @@ public abstract class AbstractInterpreterTest { @Before public void setUp() throws Exception { // copy the resources files to a temp folder - testRootDir = new File(System.getProperty("java.io.tmpdir") + "/Zeppelin_Test_" + System.currentTimeMillis()); - testRootDir.mkdirs(); - LOGGER.info("Create tmp directory: {} as root folder of ZEPPELIN_INTERPRETER_DIR & ZEPPELIN_CONF_DIR", testRootDir.getAbsolutePath()); - interpreterDir = new File(testRootDir, "interpreter"); - confDir = new File(testRootDir, "conf"); - notebookDir = new File(testRootDir, "notebook"); + zeppelinHome = new File(".."); + LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath()); + interpreterDir = new File(zeppelinHome, "interpreter_" + getClass().getSimpleName()); + confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName()); + notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName()); interpreterDir.mkdirs(); confDir.mkdirs(); @@ -55,10 +50,10 @@ public abstract class AbstractInterpreterTest { FileUtils.copyDirectory(new File("src/test/resources/interpreter"), interpreterDir); FileUtils.copyDirectory(new File("src/test/resources/conf"), confDir); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath()); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath()); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath()); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), INTERPRETER_SCRIPT); conf = new ZeppelinConfiguration(); interpreterSettingManager = new InterpreterSettingManager(conf, @@ -69,6 +64,8 @@ public abstract class AbstractInterpreterTest { @After public void tearDown() throws Exception { interpreterSettingManager.close(); - FileUtils.deleteDirectory(testRootDir); + FileUtils.deleteDirectory(interpreterDir); + FileUtils.deleteDirectory(confDir); + FileUtils.deleteDirectory(notebookDir); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java new file mode 100644 index 0000000..619d01a --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java @@ -0,0 +1,114 @@ +package org.apache.zeppelin.interpreter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + + +/** + * + * Util class for creating a Mini Hadoop cluster in local machine to test scenarios that needs + * hadoop cluster. + */ +public class MiniHadoopCluster { + + private static Logger LOGGER = LoggerFactory.getLogger(MiniHadoopCluster.class); + + private Configuration hadoopConf; + private MiniDFSCluster dfsCluster; + private MiniYARNCluster yarnCluster; + private String configPath = new File("target/tests/hadoop_conf").getAbsolutePath(); + + @BeforeClass + public void start() throws IOException { + LOGGER.info("Starting MiniHadoopCluster ..."); + this.hadoopConf = new Configuration(); + new File(configPath).mkdirs(); + // start MiniDFSCluster + this.dfsCluster = new MiniDFSCluster.Builder(hadoopConf) + .numDataNodes(2) + .format(true) + .waitSafeMode(true) + .build(); + this.dfsCluster.waitActive(); + saveConfig(hadoopConf, configPath + "/core-site.xml"); + + // start MiniYarnCluster + YarnConfiguration baseConfig = new YarnConfiguration(hadoopConf); + this.yarnCluster = new MiniYARNCluster(getClass().getName(), 2, + 1, 1); + yarnCluster.init(baseConfig); + + // Install a shutdown hook for stop the service and kill all running applications. + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + yarnCluster.stop(); + } + }); + + yarnCluster.start(); + + // Workaround for YARN-2642. + Configuration yarnConfig = yarnCluster.getConfig(); + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < 30 * 1000) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new IOException(e); + } + if (!yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) { + break; + } + } + if (yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) { + throw new IOException("RM not up yes"); + } + + LOGGER.info("RM address in configuration is " + yarnConfig.get(YarnConfiguration.RM_ADDRESS)); + saveConfig(yarnConfig,configPath + "/yarn-site.xml"); + } + + protected void saveConfig(Configuration conf, String dest) throws IOException { + Configuration redacted = new Configuration(conf); + // This setting references a test class that is not available when using a real Spark + // installation, so remove it from client configs. + redacted.unset("net.topology.node.switch.mapping.impl"); + + FileOutputStream out = new FileOutputStream(dest); + try { + redacted.writeXml(out); + } finally { + out.close(); + } + LOGGER.info("Save configuration to " + dest); + } + + @AfterClass + public void stop() { + if (this.yarnCluster != null) { + this.yarnCluster.stop(); + } + if (this.dfsCluster != null) { + this.dfsCluster.shutdown(); + } + } + + public String getConfigPath() { + return configPath; + } + + public MiniYARNCluster getYarnCluster() { + return yarnCluster; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java new file mode 100644 index 0000000..923ae5a --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java @@ -0,0 +1,68 @@ +package org.apache.zeppelin.interpreter; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +import static org.mockito.Mockito.mock; + +public class MiniZeppelin { + + protected static final Logger LOGGER = LoggerFactory.getLogger(MiniZeppelin.class); + + protected InterpreterSettingManager interpreterSettingManager; + protected InterpreterFactory interpreterFactory; + protected File zeppelinHome; + private File confDir; + private File notebookDir; + protected ZeppelinConfiguration conf; + + public void start() throws IOException { + zeppelinHome = new File(".."); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), + zeppelinHome.getAbsolutePath()); + confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName()); + notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName()); + confDir.mkdirs(); + notebookDir.mkdirs(); + LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath()); + FileUtils.copyFile(new File(zeppelinHome, "conf/log4j.properties"), new File(confDir, "log4j.properties")); + FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties")); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); + conf = new ZeppelinConfiguration(); + interpreterSettingManager = new InterpreterSettingManager(conf, + mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class)); + interpreterFactory = new InterpreterFactory(interpreterSettingManager); + } + + public void stop() throws IOException { + interpreterSettingManager.close(); + FileUtils.deleteDirectory(confDir); + FileUtils.deleteDirectory(notebookDir); + } + + public File getZeppelinHome() { + return zeppelinHome; + } + + public File getZeppelinConfDir() { + return confDir; + } + + public InterpreterFactory getInterpreterFactory() { + return interpreterFactory; + } + + public InterpreterSettingManager getInterpreterSettingManager() { + return interpreterSettingManager; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java new file mode 100644 index 0000000..24a9aee --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java @@ -0,0 +1,147 @@ +package org.apache.zeppelin.interpreter; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.EnumSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SparkInterpreterModeTest { + + private static MiniHadoopCluster hadoopCluster; + private static MiniZeppelin zeppelin; + private static InterpreterFactory interpreterFactory; + private static InterpreterSettingManager interpreterSettingManager; + + @BeforeClass + public static void setUp() throws IOException { + hadoopCluster = new MiniHadoopCluster(); + hadoopCluster.start(); + + zeppelin = new MiniZeppelin(); + zeppelin.start(); + interpreterFactory = zeppelin.getInterpreterFactory(); + interpreterSettingManager = zeppelin.getInterpreterSettingManager(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (zeppelin != null) { + zeppelin.stop(); + } + if (hadoopCluster != null) { + hadoopCluster.stop(); + } + } + + private void testInterpreterBasics() throws IOException { + // test SparkInterpreter + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds()); + Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark"); + + InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext(); + InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context); + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code); + interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", context); + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code); + assertTrue(interpreterResult.msg.get(0).getData().contains("45")); + + // test PySparkInterpreter + Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark"); + interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context); + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code); + + // test IPySparkInterpreter + Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark"); + interpreterResult = ipySparkInterpreter.interpret("sqlContext.table('test').show()", context); + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code); + + // test SparkSQLInterpreter + Interpreter sqlInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.sql"); + interpreterResult = sqlInterpreter.interpret("select count(1) from test", context); + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code); + assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType()); + assertEquals("count(1)\n2\n", interpreterResult.message().get(0).getData()); + } + + @Test + public void testLocalMode() throws IOException, YarnException { + InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); + sparkInterpreterSetting.setProperty("master", "local[*]"); + sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME")); + sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath()); + sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false"); + sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false"); + + testInterpreterBasics(); + + // no yarn application launched + GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); + GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); + assertEquals(0, response.getApplicationList().size()); + + interpreterSettingManager.close(); + } + + @Test + public void testYarnClientMode() throws IOException, YarnException, InterruptedException { + InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); + sparkInterpreterSetting.setProperty("master", "yarn-client"); + sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath()); + sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME")); + sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath()); + sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false"); + sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false"); + sparkInterpreterSetting.setProperty("PYSPARK_PYTHON", getPythonExec()); + sparkInterpreterSetting.setProperty("spark.driver.memory", "512m"); + + testInterpreterBasics(); + + // 1 yarn application launched + GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); + GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); + assertEquals(1, response.getApplicationList().size()); + + interpreterSettingManager.close(); + } + + @Test + public void testYarnClusterMode() throws IOException, YarnException, InterruptedException { + InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); + sparkInterpreterSetting.setProperty("master", "yarn-cluster"); + sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath()); + sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME")); + sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath()); + sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false"); + sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false"); + sparkInterpreterSetting.setProperty("spark.pyspark.python", getPythonExec()); + sparkInterpreterSetting.setProperty("spark.driver.memory", "512m"); + + testInterpreterBasics(); + + // 1 yarn application launched + GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); + GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); + assertEquals(1, response.getApplicationList().size()); + + interpreterSettingManager.close(); + } + + private String getPythonExec() throws IOException, InterruptedException { + Process process = Runtime.getRuntime().exec(new String[]{"which", "python"}); + if (process.waitFor() != 0) { + throw new RuntimeException("Fail to run command: which python."); + } + return IOUtils.toString(process.getInputStream()).trim(); + } +}