Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master a9d686b40 -> b4b4f5521


ZEPPELIN-262 Use spark-submit to run spark interpreter process

https://issues.apache.org/jira/browse/ZEPPELIN-262

This patch make zeppelin uses spark-submit to run spark interpreter process, 
when SPARK_HOME is defined. This will potentially solve all the configuration 
problems related to spark interpreter.

#### How to use?

Define SPARK_HOME env variable in conf/zeppelin-env.sh
Then it'll use your SPARK_HOME/bin/spark-submit, so you will not need any 
additional configuration :-)

#### Backward compatibility

If You have not defined your SPARK_HOME, you still able to run spark 
interpreter in old (current) way.
However it is not encouraged anymore.

Author: Lee moon soo <[email protected]>

Closes #270 from Leemoonsoo/spark_submit and squashes the following commits:

4eb0848 [Lee moon soo] export and check SPARK_SUBMIT
a8a3440 [Lee moon soo] handle spark.files correctly for pyspark when 
spark-submit is used
d4acd1b [Lee moon soo] Add PYTHONPATH
c9418c6 [Lee moon soo] Bring back some entries with more commments
cac2bb8 [Lee moon soo] Take care classpath of SparkIMain
5d3154e [Lee moon soo] Remove clean. otherwise mvn clean package will remove 
interpreter/spark/dep directory
2d27e9c [Lee moon soo] use spark-submit to run spark interpreter process when 
SPARK_HOME is defined


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/b4b4f552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/b4b4f552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/b4b4f552

Branch: refs/heads/master
Commit: b4b4f5521a57fd3b0902b5e3ab0e228c10b8bac5
Parents: a9d686b
Author: Lee moon soo <[email protected]>
Authored: Fri Sep 4 19:23:13 2015 -0700
Committer: Lee moon soo <[email protected]>
Committed: Mon Sep 7 21:40:58 2015 -0700

----------------------------------------------------------------------
 README.md                                       |  4 +-
 bin/interpreter.sh                              | 95 ++++++++------------
 conf/zeppelin-env.sh.template                   | 25 ++++--
 spark/pom.xml                                   | 41 ++++++---
 .../apache/zeppelin/spark/SparkInterpreter.java | 10 ++-
 5 files changed, 97 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index e7761f0..07c70ce 100644
--- a/README.md
+++ b/README.md
@@ -128,9 +128,7 @@ If you set `SPARK_HOME`, you should deploy spark binary on 
the same location to
 Yarn
 
     # ./conf/zeppelin-env.sh
-    export HADOOP_CONF_DIR=/path/to/hadoop_conf_dir
-  
-`HADOOP_CONF_DIR` should contains yarn-site.xml and core-site.xml.
+    export SPARK_HOME=/path/to/spark_dir
 
 ### Run
     ./bin/zeppelin-daemon.sh start

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 61dd249..e03a13b 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -72,77 +72,60 @@ fi
 
 # set spark related env variables
 if [[ "${INTERPRETER_ID}" == "spark" ]]; then
-  # add Hadoop jars into classpath
-  if [[ -n "${HADOOP_HOME}" ]]; then
-    # Apache
-    addEachJarInDir "${HADOOP_HOME}/share"
-
-    # CDH
-    addJarInDir "${HADOOP_HOME}"
-    addJarInDir "${HADOOP_HOME}/lib"
-  fi
+  if [[ -n "${SPARK_HOME}" ]]; then
+    export SPARK_SUBMIT="${SPARK_HOME}/bin/spark-submit"
+    SPARK_APP_JAR="$(ls 
${ZEPPELIN_HOME}/interpreter/spark/zeppelin-spark*.jar)"
+    # This will evantually passes SPARK_APP_JAR to classpath of SparkIMain
+    ZEPPELIN_CLASSPATH=${SPARK_APP_JAR}
 
-  # autodetect HADOOP_CONF_HOME by heuristic
-  if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
-    if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
-      export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
-    elif [[ -d "/etc/hadoop/conf" ]]; then
-      export HADOOP_CONF_DIR="/etc/hadoop/conf"
+    export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
+    export 
PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"    
+  else
+    # add Hadoop jars into classpath
+    if [[ -n "${HADOOP_HOME}" ]]; then
+      # Apache
+      addEachJarInDir "${HADOOP_HOME}/share"
+
+      # CDH
+      addJarInDir "${HADOOP_HOME}"
+      addJarInDir "${HADOOP_HOME}/lib"
     fi
-  fi
 
-  if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
-    ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}"
-  fi
-
-  # add Spark jars into classpath
-  if [[ -n "${SPARK_HOME}" ]]; then
-    addJarInDir "${SPARK_HOME}/lib"
-    
PYSPARKPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-0.8.2.1-src.zip"
-  else
     addJarInDir "${INTERPRETER_DIR}/dep"
     
PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-0.8.2.1-src.zip"
-  fi
 
-  # autodetect SPARK_CONF_DIR
-  if [[ -n "${SPARK_HOME}" ]] && [[ -z "${SPARK_CONF_DIR}" ]]; then
-    if [[ -d "${SPARK_HOME}/conf" ]]; then
-      SPARK_CONF_DIR="${SPARK_HOME}/conf"
+    if [[ -z "${PYTHONPATH}" ]]; then
+      export PYTHONPATH="${PYSPARKPATH}"
+    else
+      export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}"
+    fi
+    unset PYSPARKPATH
+
+    # autodetect HADOOP_CONF_HOME by heuristic
+    if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
+      if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
+        export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
+      elif [[ -d "/etc/hadoop/conf" ]]; then
+        export HADOOP_CONF_DIR="/etc/hadoop/conf"
+      fi
     fi
-  fi
 
-  # read spark-*.conf if exists
-  if [[ -d "${SPARK_CONF_DIR}" ]]; then
-    ls ${SPARK_CONF_DIR}/spark-*.conf > /dev/null 2>&1
-    if [[ "$?" -eq 0 ]]; then
-      for file in ${SPARK_CONF_DIR}/spark-*.conf; do
-        while read -r line; do
-          echo "${line}" | grep -e "^spark[.]" > /dev/null
-          if [ "$?" -ne 0 ]; then
-            # skip the line not started with 'spark.'
-            continue;
-          fi
-          SPARK_CONF_KEY=`echo "${line}" | sed -e 's/\(^spark[^ ]*\)[ 
\t]*\(.*\)/\1/g'`
-          SPARK_CONF_VALUE=`echo "${line}" | sed -e 's/\(^spark[^ ]*\)[ 
\t]*\(.*\)/\2/g'`
-          export ZEPPELIN_JAVA_OPTS+=" 
-D${SPARK_CONF_KEY}=\"${SPARK_CONF_VALUE}\""
-        done < "${file}"
-      done
+    if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
+      ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}"
     fi
-  fi
 
-  if [[ -z "${PYTHONPATH}" ]]; then
-    export PYTHONPATH="${PYSPARKPATH}"
-  else
-    export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}"
+    export SPARK_CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
   fi
-
-  unset PYSPARKPATH
 fi
 
-export SPARK_CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
 CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
 
-${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} 
${PORT} &
+if [[ -n "${SPARK_SUBMIT}" ]]; then
+    ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path 
"${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" 
${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} &
+else
+    ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} 
${PORT} &
+fi
+
 pid=$!
 if [[ -z "${pid}" ]]; then
   return 1;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/conf/zeppelin-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template
index a5beda7..fe133ba 100644
--- a/conf/zeppelin-env.sh.template
+++ b/conf/zeppelin-env.sh.template
@@ -33,14 +33,29 @@
 # export ZEPPELIN_IDENT_STRING                 # A string representing this 
instance of zeppelin. $USER by default.
 # export ZEPPELIN_NICENESS                     # The scheduling priority for 
daemons. Defaults to 0.
 
-# export ZEPPELIN_SPARK_USEHIVECONTEXT  # Use HiveContext instead of 
SQLContext if set true. true by default.
-# export ZEPPELIN_SPARK_CONCURRENTSQL   # Execute multiple SQL concurrently if 
set true. false by default.
-# export ZEPPELIN_SPARK_MAXRESULT       # Max number of SparkSQL result to 
display. 1000 by default.
 
+#### Spark interpreter configuration ####
+
+## Use provided spark installation ##
+## defining SPARK_HOME makes Zeppelin run spark interpreter process using 
spark-submit
+##
+# export SPARK_HOME                             # (required) When it is 
defined, load it instead of Zeppelin embedded Spark libraries
+# export SPARK_SUBMIT_OPTIONS                   # (optional) extra options to 
pass to spark submit. eg) "--driver-memory 512M --executor-memory 1G".
+
+## Use embedded spark binaries ##
+## without SPARK_HOME defined, Zeppelin still able to run spark interpreter 
process using embedded spark binaries.
+## however, it is not encouraged when you can define SPARK_HOME
+##
 # Options read in YARN client mode
 # export HADOOP_CONF_DIR                       # yarn-site.xml is located in 
configuration directory in HADOOP_CONF_DIR.
-
 # Pyspark (supported with Spark 1.2.1 and above)
 # To configure pyspark, you need to set spark distribution's path to 
'spark.home' property in Interpreter setting screen in Zeppelin GUI
 # export PYSPARK_PYTHON                        # path to the python command. 
must be the same path on the driver(Zeppelin) and all workers.
-# export PYTHONPATH                            # extra PYTHONPATH.
+# export PYTHONPATH  
+
+## Spark interpreter options ##
+##
+# export ZEPPELIN_SPARK_USEHIVECONTEXT  # Use HiveContext instead of 
SQLContext if set true. true by default.
+# export ZEPPELIN_SPARK_CONCURRENTSQL   # Execute multiple SQL concurrently if 
set true. false by default.
+# export ZEPPELIN_SPARK_MAXRESULT       # Max number of SparkSQL result to 
display. 1000 by default.
+

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 26b3b7f..6b17688 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -65,7 +65,6 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>zeppelin-interpreter</artifactId>
       <version>${project.version}</version>
-      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -357,23 +356,41 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <version>2.8</version>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.3</version>
+        <configuration>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+              </excludes>
+            </filter>
+          </filters>
+          <transformers>
+            <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+            <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+              <resource>reference.conf</resource>
+            </transformer>
+          </transformers>
+        </configuration>
         <executions>
           <execution>
-            <id>copy-dependencies</id>
             <phase>package</phase>
             <goals>
-              <goal>copy-dependencies</goal>
+              <goal>shade</goal>
             </goals>
-            <configuration>
-              
<outputDirectory>${project.build.directory}/../../interpreter/spark</outputDirectory>
-              <overWriteReleases>false</overWriteReleases>
-              <overWriteSnapshots>false</overWriteSnapshots>
-              <overWriteIfNewer>true</overWriteIfNewer>
-              <includeScope>runtime</includeScope>
-            </configuration>
           </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
           <execution>
             <phase>package</phase>
             <goals>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b4b4f552/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 82f8556..dfb846c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -281,7 +281,6 @@ public class SparkInterpreter extends Interpreter {
     }
 
     //TODO(jongyoul): Move these codes into PySparkInterpreter.java
-
     String pysparkBasePath = getSystemDefault("SPARK_HOME", "spark.home", 
null);
     File pysparkPath;
     if (null == pysparkBasePath) {
@@ -304,10 +303,13 @@ public class SparkInterpreter extends Interpreter {
     pythonLibUris.trimToSize();
     if (pythonLibs.length == pythonLibUris.size()) {
       conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris));
-      conf.set("spark.files", conf.get("spark.yarn.dist.files"));
+      if (!useSparkSubmit()) {
+        conf.set("spark.files", conf.get("spark.yarn.dist.files"));
+      }
       conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs));
     }
 
+
     SparkContext sparkContext = new SparkContext(conf);
     return sparkContext;
   }
@@ -316,6 +318,10 @@ public class SparkInterpreter extends Interpreter {
     return (o instanceof String) ? (String) o : "";
   }
 
+  private boolean useSparkSubmit() {
+    return null != System.getenv("SPARK_SUBMIT");
+  }
+
   public static String getSystemDefault(
       String envName,
       String propertyName,

Reply via email to