Repository: spark
Updated Branches:
  refs/heads/branch-1.5 1067c7369 -> 18d78a823


[SPARK-9074] [LAUNCHER] Allow arbitrary Spark args to be set.

This change allows any Spark argument to be added to the app to
be started using SparkLauncher. Known arguments are properly
validated, while unknown arguments are allowed so that the
library can launch newer Spark versions (in case SPARK_HOME points
at one).

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #7975 from vanzin/SPARK-9074 and squashes the following commits:

b5e451a [Marcelo Vanzin] [SPARK-9074] [launcher] Allow arbitrary Spark args to 
be set.

(cherry picked from commit 5a5bbc29961630d649d4bd4acd5d19eb537b5fd0)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18d78a82
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18d78a82
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18d78a82

Branch: refs/heads/branch-1.5
Commit: 18d78a8239e62ea0fe7952b7952cfb72bdc14a18
Parents: 1067c73
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Tue Aug 11 16:33:08 2015 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Aug 11 16:33:21 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/launcher/SparkLauncher.java    | 101 ++++++++++++++++++-
 .../launcher/SparkSubmitCommandBuilder.java     |   2 +-
 .../spark/launcher/SparkLauncherSuite.java      |  50 +++++++++
 3 files changed, 150 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/18d78a82/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index c0f89c9..03c9358 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -20,12 +20,13 @@ package org.apache.spark.launcher;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.spark.launcher.CommandBuilderUtils.*;
 
-/** 
+/**
  * Launcher for Spark applications.
  * <p>
  * Use this class to start Spark applications programmatically. The class uses 
a builder pattern
@@ -57,7 +58,8 @@ public class SparkLauncher {
   /** Configuration key for the number of executor CPU cores. */
   public static final String EXECUTOR_CORES = "spark.executor.cores";
 
-  private final SparkSubmitCommandBuilder builder;
+  // Visible for testing.
+  final SparkSubmitCommandBuilder builder;
 
   public SparkLauncher() {
     this(null);
@@ -188,6 +190,73 @@ public class SparkLauncher {
   }
 
   /**
+   * Adds a no-value argument to the Spark invocation. If the argument is 
known, this method
+   * validates whether the argument is indeed a no-value argument, and throws 
an exception
+   * otherwise.
+   * <p/>
+   * Use this method with caution. It is possible to create an invalid Spark 
command by passing
+   * unknown arguments to this method, since those are allowed for forward 
compatibility.
+   *
+   * @param arg Argument to add.
+   * @return This launcher.
+   */
+  public SparkLauncher addSparkArg(String arg) {
+    SparkSubmitOptionParser validator = new ArgumentValidator(false);
+    validator.parse(Arrays.asList(arg));
+    builder.sparkArgs.add(arg);
+    return this;
+  }
+
+  /**
+   * Adds an argument with a value to the Spark invocation. If the argument 
name corresponds to
+   * a known argument, the code validates that the argument actually expects a 
value, and throws
+   * an exception otherwise.
+   * <p/>
+   * It is safe to add arguments modified by other methods in this class (such 
as
+   * {@link #setMaster(String)} - the last invocation will be the one to take 
effect.
+   * <p/>
+   * Use this method with caution. It is possible to create an invalid Spark 
command by passing
+   * unknown arguments to this method, since those are allowed for forward 
compatibility.
+   *
+   * @param name Name of argument to add.
+   * @param value Value of the argument.
+   * @return This launcher.
+   */
+  public SparkLauncher addSparkArg(String name, String value) {
+    SparkSubmitOptionParser validator = new ArgumentValidator(true);
+    if (validator.MASTER.equals(name)) {
+      setMaster(value);
+    } else if (validator.PROPERTIES_FILE.equals(name)) {
+      setPropertiesFile(value);
+    } else if (validator.CONF.equals(name)) {
+      String[] vals = value.split("=", 2);
+      setConf(vals[0], vals[1]);
+    } else if (validator.CLASS.equals(name)) {
+      setMainClass(value);
+    } else if (validator.JARS.equals(name)) {
+      builder.jars.clear();
+      for (String jar : value.split(",")) {
+        addJar(jar);
+      }
+    } else if (validator.FILES.equals(name)) {
+      builder.files.clear();
+      for (String file : value.split(",")) {
+        addFile(file);
+      }
+    } else if (validator.PY_FILES.equals(name)) {
+      builder.pyFiles.clear();
+      for (String file : value.split(",")) {
+        addPyFile(file);
+      }
+    } else {
+      validator.parse(Arrays.asList(name, value));
+      builder.sparkArgs.add(name);
+      builder.sparkArgs.add(value);
+    }
+    return this;
+  }
+
+  /**
    * Adds command line arguments for the application.
    *
    * @param args Arguments to pass to the application's main class.
@@ -277,4 +346,32 @@ public class SparkLauncher {
     return pb.start();
   }
 
+  private static class ArgumentValidator extends SparkSubmitOptionParser {
+
+    private final boolean hasValue;
+
+    ArgumentValidator(boolean hasValue) {
+      this.hasValue = hasValue;
+    }
+
+    @Override
+    protected boolean handle(String opt, String value) {
+      if (value == null && hasValue) {
+        throw new IllegalArgumentException(String.format("'%s' does not expect 
a value.", opt));
+      }
+      return true;
+    }
+
+    @Override
+    protected boolean handleUnknown(String opt) {
+      // Do not fail on unknown arguments, to support future arguments added 
to SparkSubmit.
+      return true;
+    }
+
+    protected void handleExtraArgs(List<String> extra) {
+      // No op.
+    }
+
+  };
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18d78a82/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 87c43aa..4f354ce 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -76,7 +76,7 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
       "spark-internal");
   }
 
-  private final List<String> sparkArgs;
+  final List<String> sparkArgs;
   private final boolean printHelp;
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/18d78a82/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 252d5ab..d0c26dd 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.launcher;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -36,7 +37,53 @@ public class SparkLauncherSuite {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkLauncherSuite.class);
 
   @Test
+  public void testSparkArgumentHandling() throws Exception {
+    SparkLauncher launcher = new SparkLauncher()
+      .setSparkHome(System.getProperty("spark.test.home"));
+    SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
+
+    launcher.addSparkArg(opts.HELP);
+    try {
+      launcher.addSparkArg(opts.PROXY_USER);
+      fail("Expected IllegalArgumentException.");
+    } catch (IllegalArgumentException e) {
+      // Expected.
+    }
+
+    launcher.addSparkArg(opts.PROXY_USER, "someUser");
+    try {
+      launcher.addSparkArg(opts.HELP, "someValue");
+      fail("Expected IllegalArgumentException.");
+    } catch (IllegalArgumentException e) {
+      // Expected.
+    }
+
+    launcher.addSparkArg("--future-argument");
+    launcher.addSparkArg("--future-argument", "someValue");
+
+    launcher.addSparkArg(opts.MASTER, "myMaster");
+    assertEquals("myMaster", launcher.builder.master);
+
+    launcher.addJar("foo");
+    launcher.addSparkArg(opts.JARS, "bar");
+    assertEquals(Arrays.asList("bar"), launcher.builder.jars);
+
+    launcher.addFile("foo");
+    launcher.addSparkArg(opts.FILES, "bar");
+    assertEquals(Arrays.asList("bar"), launcher.builder.files);
+
+    launcher.addPyFile("foo");
+    launcher.addSparkArg(opts.PY_FILES, "bar");
+    assertEquals(Arrays.asList("bar"), launcher.builder.pyFiles);
+
+    launcher.setConf("spark.foo", "foo");
+    launcher.addSparkArg(opts.CONF, "spark.foo=bar");
+    assertEquals("bar", launcher.builder.conf.get("spark.foo"));
+  }
+
+  @Test
   public void testChildProcLauncher() throws Exception {
+    SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
     Map<String, String> env = new HashMap<String, String>();
     env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
 
@@ -44,9 +91,12 @@ public class SparkLauncherSuite {
       .setSparkHome(System.getProperty("spark.test.home"))
       .setMaster("local")
       .setAppResource("spark-internal")
+      .addSparkArg(opts.CONF,
+        String.format("%s=-Dfoo=ShouldBeOverriddenBelow", 
SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
       .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
         "-Dfoo=bar -Dtest.name=-testChildProcLauncher")
       .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, 
System.getProperty("java.class.path"))
+      .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
       .setMainClass(SparkLauncherTestApp.class.getName())
       .addAppArgs("proc");
     final Process app = launcher.launch();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to