Repository: spark
Updated Branches:
  refs/heads/master 6de282e2d -> 971738936


[SPARK-6890] [core] Fix launcher lib work with SPARK_PREPEND_CLASSES.

The fix for SPARK-6406 broke the case where sub-processes are launched
when SPARK_PREPEND_CLASSES is set, because the code now would only add
the launcher's build directory to the sub-process's classpath instead
of the complete assembly.

This patch fixes the problem by having the launch scripts stash the
assembly's location in an environment variable. This is not the prettiest
solution, but it avoids having to plumb that location all the way through
the Worker code that launches executors. The env variable is always
set by the launch scripts, so users cannot override it.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #5504 from vanzin/SPARK-6890 and squashes the following commits:

7aec921 [Marcelo Vanzin] Fix tests.
ff87a60 [Marcelo Vanzin] Merge branch 'master' into SPARK-6890
31d3ce8 [Marcelo Vanzin] [SPARK-6890] [core] Fix launcher lib work with 
SPARK_PREPEND_CLASSES.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97173893
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97173893
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97173893

Branch: refs/heads/master
Commit: 9717389365772d218cd7c67f9a13c3440f3c6791
Parents: 6de282e
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Tue Apr 14 18:51:39 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Apr 14 18:51:39 2015 -0700

----------------------------------------------------------------------
 bin/spark-class                                 | 11 ++++-
 bin/spark-class2.cmd                            | 11 ++++-
 .../spark/launcher/AbstractCommandBuilder.java  | 44 ++++++++++++++++++--
 .../spark/launcher/CommandBuilderUtils.java     |  1 +
 .../SparkSubmitCommandBuilderSuite.java         | 15 ++++---
 5 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97173893/bin/spark-class
----------------------------------------------------------------------
diff --git a/bin/spark-class b/bin/spark-class
index c03946d..c49d97c 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -82,13 +82,22 @@ if [ $(command -v "$JAR_CMD") ] ; then
   fi
 fi
 
+LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
+
+# Add the launcher build dir to the classpath if requested.
+if [ -n "$SPARK_PREPEND_CLASSES" ]; then
+  
LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
+fi
+
+export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
+
 # The launcher library will print arguments separated by a NULL character, to 
allow arguments with
 # characters that would be otherwise interpreted by the shell. Read that in a 
while loop, populating
 # an array that will be used to exec the final command.
 CMD=()
 while IFS= read -d '' -r ARG; do
   CMD+=("$ARG")
-done < <("$RUNNER" -cp "$SPARK_ASSEMBLY_JAR" org.apache.spark.launcher.Main 
"$@")
+done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
 
 if [ "${CMD[0]}" = "usage" ]; then
   "${CMD[@]}"

http://git-wip-us.apache.org/repos/asf/spark/blob/97173893/bin/spark-class2.cmd
----------------------------------------------------------------------
diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd
index 4b3401d..3d068dd 100644
--- a/bin/spark-class2.cmd
+++ b/bin/spark-class2.cmd
@@ -46,13 +46,22 @@ if "%SPARK_ASSEMBLY_JAR%"=="0" (
   exit /b 1
 )
 
+set LAUNCH_CLASSPATH=%SPARK_ASSEMBLY_JAR%
+
+rem Add the launcher build dir to the classpath if requested.
+if not "x%SPARK_PREPEND_CLASSES%"=="x" (
+  set 
LAUNCH_CLASSPATH=%SPARK_HOME%\launcher\target\scala-%SPARK_SCALA_VERSION%\classes;%LAUNCH_CLASSPATH%
+)
+
+set _SPARK_ASSEMBLY=%SPARK_ASSEMBLY_JAR%
+
 rem Figure out where java is.
 set RUNNER=java
 if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
 
 rem The launcher library prints the command to be executed in a single line 
suitable for being
 rem executed by the batch interpreter. So read all the output of the launcher 
into a variable.
-for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %SPARK_ASSEMBLY_JAR% 
org.apache.spark.launcher.Main %*"') do (
+for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% 
org.apache.spark.launcher.Main %*"') do (
   set SPARK_CMD=%%i
 )
 %SPARK_CMD%

http://git-wip-us.apache.org/repos/asf/spark/blob/97173893/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index d827914..b8f02b9 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -186,12 +186,24 @@ abstract class AbstractCommandBuilder {
       addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
     }
 
-    final String assembly = 
AbstractCommandBuilder.class.getProtectionDomain().getCodeSource().
-       getLocation().getPath();
+    // We can't rely on the ENV_SPARK_ASSEMBLY variable to be set. Certain 
situations, such as
+    // when running unit tests, or user code that embeds Spark and creates a 
SparkContext
+    // with a local or local-cluster master, will cause this code to be called 
from an
+    // environment where that env variable is not guaranteed to exist.
+    //
+    // For the testing case, we rely on the test code to set and propagate the 
test classpath
+    // appropriately.
+    //
+    // For the user code case, we fall back to looking for the Spark assembly 
under SPARK_HOME.
+    // That duplicates some of the code in the shell scripts that look for the 
assembly, though.
+    String assembly = getenv(ENV_SPARK_ASSEMBLY);
+    if (assembly == null && isEmpty(getenv("SPARK_TESTING"))) {
+      assembly = findAssembly();
+    }
     addToClassPath(cp, assembly);
 
-    // Datanucleus jars must be included on the classpath. Datanucleus jars do 
not work if only 
-    // included in the uber jar as plugin.xml metadata is lost. Both sbt and 
maven will populate 
+    // Datanucleus jars must be included on the classpath. Datanucleus jars do 
not work if only
+    // included in the uber jar as plugin.xml metadata is lost. Both sbt and 
maven will populate
     // "lib_managed/jars/" with the datanucleus jars when Spark is built with 
Hive
     File libdir;
     if (new File(sparkHome, "RELEASE").isFile()) {
@@ -299,6 +311,30 @@ abstract class AbstractCommandBuilder {
     return firstNonEmpty(childEnv.get(key), System.getenv(key));
   }
 
+  private String findAssembly() {
+    String sparkHome = getSparkHome();
+    File libdir;
+    if (new File(sparkHome, "RELEASE").isFile()) {
+      libdir = new File(sparkHome, "lib");
+      checkState(libdir.isDirectory(), "Library directory '%s' does not 
exist.",
+          libdir.getAbsolutePath());
+    } else {
+      libdir = new File(sparkHome, String.format("assembly/target/scala-%s", 
getScalaVersion()));
+    }
+
+    final Pattern re = Pattern.compile("spark-assembly.*hadoop.*\\.jar");
+    FileFilter filter = new FileFilter() {
+      @Override
+      public boolean accept(File file) {
+        return file.isFile() && re.matcher(file.getName()).matches();
+      }
+    };
+    File[] assemblies = libdir.listFiles(filter);
+    checkState(assemblies != null && assemblies.length > 0, "No assemblies 
found in '%s'.", libdir);
+    checkState(assemblies.length == 1, "Multiple assemblies found in '%s'.", 
libdir);
+    return assemblies[0].getAbsolutePath();
+  }
+
   private String getConfDir() {
     String confDir = getenv("SPARK_CONF_DIR");
     return confDir != null ? confDir : join(File.separator, getSparkHome(), 
"conf");

http://git-wip-us.apache.org/repos/asf/spark/blob/97173893/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java 
b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
index f4ebc25..8028e42 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -30,6 +30,7 @@ class CommandBuilderUtils {
   static final String DEFAULT_MEM = "512m";
   static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf";
   static final String ENV_SPARK_HOME = "SPARK_HOME";
+  static final String ENV_SPARK_ASSEMBLY = "_SPARK_ASSEMBLY";
 
   /** Returns whether the given string is null or empty. */
   static boolean isEmpty(String s) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97173893/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
 
b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
index 626116a..97043a7 100644
--- 
a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
+++ 
b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -98,7 +98,7 @@ public class SparkSubmitCommandBuilderSuite {
       parser.NAME,
       "appName");
 
-    List<String> args = new 
SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+    List<String> args = 
newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
     List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", 
"--app-switch");
     assertEquals(expected, args.subList(args.size() - expected.size(), 
args.size()));
   }
@@ -110,7 +110,7 @@ public class SparkSubmitCommandBuilderSuite {
       parser.MASTER + "=foo",
       parser.DEPLOY_MODE + "=bar");
 
-    List<String> cmd = new 
SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+    List<String> cmd = 
newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
     assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
     assertEquals("foo", findArgValue(cmd, parser.MASTER));
     assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
@@ -153,7 +153,7 @@ public class SparkSubmitCommandBuilderSuite {
     String deployMode = isDriver ? "client" : "cluster";
 
     SparkSubmitCommandBuilder launcher =
-      new SparkSubmitCommandBuilder(Collections.<String>emptyList());
+      newCommandBuilder(Collections.<String>emptyList());
     launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
       System.getProperty("spark.test.home"));
     launcher.master = "yarn";
@@ -273,10 +273,15 @@ public class SparkSubmitCommandBuilderSuite {
     return contains(needle, list.split(sep));
   }
 
-  private List<String> buildCommand(List<String> args, Map<String, String> 
env) throws Exception {
+  private SparkSubmitCommandBuilder newCommandBuilder(List<String> args) {
     SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
     builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, 
System.getProperty("spark.test.home"));
-    return builder.buildCommand(env);
+    builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_ASSEMBLY, "dummy");
+    return builder;
+  }
+
+  private List<String> buildCommand(List<String> args, Map<String, String> 
env) throws Exception {
+    return newCommandBuilder(args).buildCommand(env);
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to