Repository: incubator-zeppelin Updated Branches: refs/heads/master a9d686b40 -> b4b4f5521
ZEPPELIN-262 Use spark-submit to run spark interpreter process https://issues.apache.org/jira/browse/ZEPPELIN-262 This patch make zeppelin uses spark-submit to run spark interpreter process, when SPARK_HOME is defined. This will potentially solve all the configuration problems related to spark interpreter. #### How to use? Define SPARK_HOME env variable in conf/zeppelin-env.sh Then it'll use your SPARK_HOME/bin/spark-submit, so you will not need any additional configuration :-) #### Backward compatibility If You have not defined your SPARK_HOME, you still able to run spark interpreter in old (current) way. However it is not encouraged anymore. Author: Lee moon soo <[email protected]> Closes #270 from Leemoonsoo/spark_submit and squashes the following commits: 4eb0848 [Lee moon soo] export and check SPARK_SUBMIT a8a3440 [Lee moon soo] handle spark.files correctly for pyspark when spark-submit is used d4acd1b [Lee moon soo] Add PYTHONPATH c9418c6 [Lee moon soo] Bring back some entries with more commments cac2bb8 [Lee moon soo] Take care classpath of SparkIMain 5d3154e [Lee moon soo] Remove clean. otherwise mvn clean package will remove interpreter/spark/dep directory 2d27e9c [Lee moon soo] use spark-submit to run spark interpreter process when SPARK_HOME is defined Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/b4b4f552 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/b4b4f552 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/b4b4f552 Branch: refs/heads/master Commit: b4b4f5521a57fd3b0902b5e3ab0e228c10b8bac5 Parents: a9d686b Author: Lee moon soo <[email protected]> Authored: Fri Sep 4 19:23:13 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Mon Sep 7 21:40:58 2015 -0700 ---------------------------------------------------------------------- README.md | 4 +- bin/interpreter.sh | 95 ++++++++------------ conf/zeppelin-env.sh.template | 25 ++++-- spark/pom.xml | 41 ++++++--- .../apache/zeppelin/spark/SparkInterpreter.java | 10 ++- 5 files changed, 97 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index e7761f0..07c70ce 100644 --- a/README.md +++ b/README.md @@ -128,9 +128,7 @@ If you set `SPARK_HOME`, you should deploy spark binary on the same location to Yarn # ./conf/zeppelin-env.sh - export HADOOP_CONF_DIR=/path/to/hadoop_conf_dir - -`HADOOP_CONF_DIR` should contains yarn-site.xml and core-site.xml. + export SPARK_HOME=/path/to/spark_dir ### Run ./bin/zeppelin-daemon.sh start http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/bin/interpreter.sh ---------------------------------------------------------------------- diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 61dd249..e03a13b 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -72,77 +72,60 @@ fi # set spark related env variables if [[ "${INTERPRETER_ID}" == "spark" ]]; then - # add Hadoop jars into classpath - if [[ -n "${HADOOP_HOME}" ]]; then - # Apache - addEachJarInDir "${HADOOP_HOME}/share" - - # CDH - addJarInDir "${HADOOP_HOME}" - addJarInDir "${HADOOP_HOME}/lib" - fi + if [[ -n "${SPARK_HOME}" ]]; then + export SPARK_SUBMIT="${SPARK_HOME}/bin/spark-submit" + SPARK_APP_JAR="$(ls ${ZEPPELIN_HOME}/interpreter/spark/zeppelin-spark*.jar)" + # This will evantually passes SPARK_APP_JAR to classpath of SparkIMain + ZEPPELIN_CLASSPATH=${SPARK_APP_JAR} - # autodetect HADOOP_CONF_HOME by heuristic - if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then - if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then - export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop" - elif [[ -d "/etc/hadoop/conf" ]]; then - export HADOOP_CONF_DIR="/etc/hadoop/conf" + export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH" + export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH" + else + # add Hadoop jars into classpath + if [[ -n "${HADOOP_HOME}" ]]; then + # Apache + addEachJarInDir "${HADOOP_HOME}/share" + + # CDH + addJarInDir "${HADOOP_HOME}" + addJarInDir "${HADOOP_HOME}/lib" fi - fi - if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then - ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}" - fi - - # add Spark jars into classpath - if [[ -n "${SPARK_HOME}" ]]; then - addJarInDir "${SPARK_HOME}/lib" - PYSPARKPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-0.8.2.1-src.zip" - else addJarInDir "${INTERPRETER_DIR}/dep" PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-0.8.2.1-src.zip" - fi - # autodetect SPARK_CONF_DIR - if [[ -n "${SPARK_HOME}" ]] && [[ -z "${SPARK_CONF_DIR}" ]]; then - if [[ -d "${SPARK_HOME}/conf" ]]; then - SPARK_CONF_DIR="${SPARK_HOME}/conf" + if [[ -z "${PYTHONPATH}" ]]; then + export PYTHONPATH="${PYSPARKPATH}" + else + export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}" + fi + unset PYSPARKPATH + + # autodetect HADOOP_CONF_HOME by heuristic + if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then + if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then + export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop" + elif [[ -d "/etc/hadoop/conf" ]]; then + export HADOOP_CONF_DIR="/etc/hadoop/conf" + fi fi - fi - # read spark-*.conf if exists - if [[ -d "${SPARK_CONF_DIR}" ]]; then - ls ${SPARK_CONF_DIR}/spark-*.conf > /dev/null 2>&1 - if [[ "$?" -eq 0 ]]; then - for file in ${SPARK_CONF_DIR}/spark-*.conf; do - while read -r line; do - echo "${line}" | grep -e "^spark[.]" > /dev/null - if [ "$?" -ne 0 ]; then - # skip the line not started with 'spark.' - continue; - fi - SPARK_CONF_KEY=`echo "${line}" | sed -e 's/\(^spark[^ ]*\)[ \t]*\(.*\)/\1/g'` - SPARK_CONF_VALUE=`echo "${line}" | sed -e 's/\(^spark[^ ]*\)[ \t]*\(.*\)/\2/g'` - export ZEPPELIN_JAVA_OPTS+=" -D${SPARK_CONF_KEY}=\"${SPARK_CONF_VALUE}\"" - done < "${file}" - done + if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then + ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}" fi - fi - if [[ -z "${PYTHONPATH}" ]]; then - export PYTHONPATH="${PYSPARKPATH}" - else - export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}" + export SPARK_CLASSPATH+=":${ZEPPELIN_CLASSPATH}" fi - - unset PYSPARKPATH fi -export SPARK_CLASSPATH+=":${ZEPPELIN_CLASSPATH}" CLASSPATH+=":${ZEPPELIN_CLASSPATH}" -${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} & +if [[ -n "${SPARK_SUBMIT}" ]]; then + ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} & +else + ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} & +fi + pid=$! if [[ -z "${pid}" ]]; then return 1; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/conf/zeppelin-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template index a5beda7..fe133ba 100644 --- a/conf/zeppelin-env.sh.template +++ b/conf/zeppelin-env.sh.template @@ -33,14 +33,29 @@ # export ZEPPELIN_IDENT_STRING # A string representing this instance of zeppelin. $USER by default. # export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0. -# export ZEPPELIN_SPARK_USEHIVECONTEXT # Use HiveContext instead of SQLContext if set true. true by default. -# export ZEPPELIN_SPARK_CONCURRENTSQL # Execute multiple SQL concurrently if set true. false by default. -# export ZEPPELIN_SPARK_MAXRESULT # Max number of SparkSQL result to display. 1000 by default. +#### Spark interpreter configuration #### + +## Use provided spark installation ## +## defining SPARK_HOME makes Zeppelin run spark interpreter process using spark-submit +## +# export SPARK_HOME # (required) When it is defined, load it instead of Zeppelin embedded Spark libraries +# export SPARK_SUBMIT_OPTIONS # (optional) extra options to pass to spark submit. eg) "--driver-memory 512M --executor-memory 1G". + +## Use embedded spark binaries ## +## without SPARK_HOME defined, Zeppelin still able to run spark interpreter process using embedded spark binaries. +## however, it is not encouraged when you can define SPARK_HOME +## # Options read in YARN client mode # export HADOOP_CONF_DIR # yarn-site.xml is located in configuration directory in HADOOP_CONF_DIR. - # Pyspark (supported with Spark 1.2.1 and above) # To configure pyspark, you need to set spark distribution's path to 'spark.home' property in Interpreter setting screen in Zeppelin GUI # export PYSPARK_PYTHON # path to the python command. must be the same path on the driver(Zeppelin) and all workers. -# export PYTHONPATH # extra PYTHONPATH. +# export PYTHONPATH + +## Spark interpreter options ## +## +# export ZEPPELIN_SPARK_USEHIVECONTEXT # Use HiveContext instead of SQLContext if set true. true by default. +# export ZEPPELIN_SPARK_CONCURRENTSQL # Execute multiple SQL concurrently if set true. false by default. +# export ZEPPELIN_SPARK_MAXRESULT # Max number of SparkSQL result to display. 1000 by default. + http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index 26b3b7f..6b17688 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -65,7 +65,6 @@ <groupId>${project.groupId}</groupId> <artifactId>zeppelin-interpreter</artifactId> <version>${project.version}</version> - <scope>provided</scope> </dependency> <dependency> @@ -357,23 +356,41 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> + <artifactId>maven-shade-plugin</artifactId> + <version>2.3</version> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>reference.conf</resource> + </transformer> + </transformers> + </configuration> <executions> <execution> - <id>copy-dependencies</id> <phase>package</phase> <goals> - <goal>copy-dependencies</goal> + <goal>shade</goal> </goals> - <configuration> - <outputDirectory>${project.build.directory}/../../interpreter/spark</outputDirectory> - <overWriteReleases>false</overWriteReleases> - <overWriteSnapshots>false</overWriteSnapshots> - <overWriteIfNewer>true</overWriteIfNewer> - <includeScope>runtime</includeScope> - </configuration> </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> <execution> <phase>package</phase> <goals> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 82f8556..dfb846c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -281,7 +281,6 @@ public class SparkInterpreter extends Interpreter { } //TODO(jongyoul): Move these codes into PySparkInterpreter.java - String pysparkBasePath = getSystemDefault("SPARK_HOME", "spark.home", null); File pysparkPath; if (null == pysparkBasePath) { @@ -304,10 +303,13 @@ public class SparkInterpreter extends Interpreter { pythonLibUris.trimToSize(); if (pythonLibs.length == pythonLibUris.size()) { conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris)); - conf.set("spark.files", conf.get("spark.yarn.dist.files")); + if (!useSparkSubmit()) { + conf.set("spark.files", conf.get("spark.yarn.dist.files")); + } conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs)); } + SparkContext sparkContext = new SparkContext(conf); return sparkContext; } @@ -316,6 +318,10 @@ public class SparkInterpreter extends Interpreter { return (o instanceof String) ? (String) o : ""; } + private boolean useSparkSubmit() { + return null != System.getenv("SPARK_SUBMIT"); + } + public static String getSystemDefault( String envName, String propertyName,
