Repository: spark
Updated Branches:
  refs/heads/master 2ae7b88a0 -> 004e29cba


[SPARK-14702] Make environment of SparkLauncher launched process more 
configurable

## What changes were proposed in this pull request?

Adds a few public methods to `SparkLauncher` to allow configuring some extra 
features of the `ProcessBuilder`, including the working directory, output and 
error stream redirection.

## How was this patch tested?

Unit testing + simple Spark driver programs

Author: Andrew Duffy <r...@aduffy.org>

Closes #14201 from andreweduffy/feature/launcher.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/004e29cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/004e29cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/004e29cb

Branch: refs/heads/master
Commit: 004e29cba518684d239d2d1661dce7c894a79f14
Parents: 2ae7b88
Author: Andrew Duffy <r...@aduffy.org>
Authored: Tue Jul 19 17:08:38 2016 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Jul 19 17:08:38 2016 -0700

----------------------------------------------------------------------
 .../spark/launcher/SparkLauncherSuite.java      |  67 +++++++-
 .../spark/launcher/ChildProcAppHandle.java      |   5 +-
 .../apache/spark/launcher/SparkLauncher.java    | 167 ++++++++++++++++---
 3 files changed, 208 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/004e29cb/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 8ca54b2..e393db0 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,10 +41,15 @@ public class SparkLauncherSuite {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkLauncherSuite.class);
   private static final NamedThreadFactory TF = new 
NamedThreadFactory("SparkLauncherSuite-%d");
 
+  private SparkLauncher launcher;
+
+  @Before
+  public void configureLauncher() {
+    launcher = new 
SparkLauncher().setSparkHome(System.getProperty("spark.test.home"));
+  }
+
   @Test
   public void testSparkArgumentHandling() throws Exception {
-    SparkLauncher launcher = new SparkLauncher()
-      .setSparkHome(System.getProperty("spark.test.home"));
     SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
 
     launcher.addSparkArg(opts.HELP);
@@ -85,14 +91,67 @@ public class SparkLauncherSuite {
     assertEquals("bar", launcher.builder.conf.get("spark.foo"));
   }
 
+  @Test(expected=IllegalStateException.class)
+  public void testRedirectTwiceFails() throws Exception {
+    launcher.setAppResource("fake-resource.jar")
+      .setMainClass("my.fake.class.Fake")
+      .redirectError()
+      .redirectError(ProcessBuilder.Redirect.PIPE)
+      .launch();
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testRedirectToLogWithOthersFails() throws Exception {
+    launcher.setAppResource("fake-resource.jar")
+      .setMainClass("my.fake.class.Fake")
+      .redirectToLog("fakeLog")
+      .redirectError(ProcessBuilder.Redirect.PIPE)
+      .launch();
+  }
+
+  @Test
+  public void testRedirectErrorToOutput() throws Exception {
+    launcher.redirectError();
+    assertTrue(launcher.redirectErrorStream);
+  }
+
+  @Test
+  public void testRedirectsSimple() throws Exception {
+    launcher.redirectError(ProcessBuilder.Redirect.PIPE);
+    assertNotNull(launcher.errorStream);
+    assertEquals(launcher.errorStream.type(), 
ProcessBuilder.Redirect.Type.PIPE);
+
+    launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
+    assertNotNull(launcher.outputStream);
+    assertEquals(launcher.outputStream.type(), 
ProcessBuilder.Redirect.Type.PIPE);
+  }
+
+  @Test
+  public void testRedirectLastWins() throws Exception {
+    launcher.redirectError(ProcessBuilder.Redirect.PIPE)
+      .redirectError(ProcessBuilder.Redirect.INHERIT);
+    assertEquals(launcher.errorStream.type(), 
ProcessBuilder.Redirect.Type.INHERIT);
+
+    launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
+      .redirectOutput(ProcessBuilder.Redirect.INHERIT);
+    assertEquals(launcher.outputStream.type(), 
ProcessBuilder.Redirect.Type.INHERIT);
+  }
+
+  @Test
+  public void testRedirectToLog() throws Exception {
+    launcher.redirectToLog("fakeLogger");
+    assertTrue(launcher.redirectToLog);
+    assertTrue(launcher.builder.getEffectiveConfig()
+      .containsKey(SparkLauncher.CHILD_PROCESS_LOGGER_NAME));
+  }
+
   @Test
   public void testChildProcLauncher() throws Exception {
     SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
     Map<String, String> env = new HashMap<>();
     env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
 
-    SparkLauncher launcher = new SparkLauncher(env)
-      .setSparkHome(System.getProperty("spark.test.home"))
+    launcher
       .setMaster("local")
       .setAppResource(SparkLauncher.NO_RESOURCE)
       .addSparkArg(opts.CONF,

http://git-wip-us.apache.org/repos/asf/spark/blob/004e29cb/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 1bfda28..c0779e1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -31,8 +30,6 @@ import java.util.logging.Logger;
 class ChildProcAppHandle implements SparkAppHandle {
 
   private static final Logger LOG = 
Logger.getLogger(ChildProcAppHandle.class.getName());
-  private static final ThreadFactory REDIRECTOR_FACTORY =
-    new NamedThreadFactory("launcher-proc-%d");
 
   private final String secret;
   private final LauncherServer server;
@@ -127,7 +124,7 @@ class ChildProcAppHandle implements SparkAppHandle {
   void setChildProc(Process childProc, String loggerName) {
     this.childProc = childProc;
     this.redirector = new OutputRedirector(childProc.getInputStream(), 
loggerName,
-      REDIRECTOR_FACTORY);
+      SparkLauncher.REDIRECTOR_FACTORY);
   }
 
   void setConnection(LauncherConnection connection) {

http://git-wip-us.apache.org/repos/asf/spark/blob/004e29cb/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index 08873f5..41f7f1f 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -82,6 +83,9 @@ public class SparkLauncher {
   /** Used internally to create unique logger names. */
   private static final AtomicInteger COUNTER = new AtomicInteger();
 
+  /** Factory for creating OutputRedirector threads. **/
+  static final ThreadFactory REDIRECTOR_FACTORY = new 
NamedThreadFactory("launcher-proc-%d");
+
   static final Map<String, String> launcherConfig = new HashMap<>();
 
   /**
@@ -99,6 +103,11 @@ public class SparkLauncher {
 
   // Visible for testing.
   final SparkSubmitCommandBuilder builder;
+  File workingDir;
+  boolean redirectToLog;
+  boolean redirectErrorStream;
+  ProcessBuilder.Redirect errorStream;
+  ProcessBuilder.Redirect outputStream;
 
   public SparkLauncher() {
     this(null);
@@ -359,6 +368,83 @@ public class SparkLauncher {
   }
 
   /**
+   * Sets the working directory of spark-submit.
+   *
+   * @param dir The directory to set as spark-submit's working directory.
+   * @return This launcher.
+   */
+  public SparkLauncher directory(File dir) {
+    workingDir = dir;
+    return this;
+  }
+
+  /**
+   * Specifies that stderr in spark-submit should be redirected to stdout.
+   *
+   * @return This launcher.
+   */
+  public SparkLauncher redirectError() {
+    redirectErrorStream = true;
+    return this;
+  }
+
+  /**
+   * Redirects error output to the specified Redirect.
+   *
+   * @param to The method of redirection.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
+    errorStream = to;
+    return this;
+  }
+
+  /**
+   * Redirects standard output to the specified Redirect.
+   *
+   * @param to The method of redirection.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
+    outputStream = to;
+    return this;
+  }
+
+  /**
+   * Redirects error output to the specified File.
+   *
+   * @param errFile The file to which stderr is written.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectError(File errFile) {
+    errorStream = ProcessBuilder.Redirect.to(errFile);
+    return this;
+  }
+
+  /**
+   * Redirects error output to the specified File.
+   *
+   * @param outFile The file to which stdout is written.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectOutput(File outFile) {
+    outputStream = ProcessBuilder.Redirect.to(outFile);
+    return this;
+  }
+
+  /**
+   * Sets all output to be logged and redirected to a logger with the 
specified name.
+   *
+   * @param loggerName The name of the logger to log stdout and stderr.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectToLog(String loggerName) {
+    setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
+    redirectToLog = true;
+    return this;
+  }
+
+  /**
    * Launches a sub-process that will start the configured Spark application.
    * <p>
    * The {@link #startApplication(SparkAppHandle.Listener...)} method is 
preferred when launching
@@ -367,7 +453,12 @@ public class SparkLauncher {
    * @return A process handle for the Spark app.
    */
   public Process launch() throws IOException {
-    return createBuilder().start();
+    Process childProc = createBuilder().start();
+    if (redirectToLog) {
+      String loggerName = 
builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+      new OutputRedirector(childProc.getInputStream(), loggerName, 
REDIRECTOR_FACTORY);
+    }
+    return childProc;
   }
 
   /**
@@ -383,12 +474,13 @@ public class SparkLauncher {
    * a child process, {@link SparkAppHandle#kill()} can still be used to kill 
the child process.
    * <p>
    * Currently, all applications are launched as child processes. The child's 
stdout and stderr
-   * are merged and written to a logger (see <code>java.util.logging</code>). 
The logger's name
-   * can be defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's 
configuration. If
-   * that option is not set, the code will try to derive a name from the 
application's name or
-   * main class / script file. If those cannot be determined, an internal, 
unique name will be
-   * used. In all cases, the logger name will start with 
"org.apache.spark.launcher.app", to fit
-   * more easily into the configuration of commonly-used logging systems.
+   * are merged and written to a logger (see <code>java.util.logging</code>) 
only if redirection
+   * has not otherwise been configured on this <code>SparkLauncher</code>. The 
logger's name can be
+   * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's 
configuration. If that
+   * option is not set, the code will try to derive a name from the 
application's name or main
+   * class / script file. If those cannot be determined, an internal, unique 
name will be used.
+   * In all cases, the logger name will start with 
"org.apache.spark.launcher.app", to fit more
+   * easily into the configuration of commonly-used logging systems.
    *
    * @since 1.6.0
    * @param listeners Listeners to add to the handle before the app is 
launched.
@@ -400,27 +492,33 @@ public class SparkLauncher {
       handle.addListener(l);
     }
 
-    String appName = 
builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
-    if (appName == null) {
-      if (builder.appName != null) {
-        appName = builder.appName;
-      } else if (builder.mainClass != null) {
-        int dot = builder.mainClass.lastIndexOf(".");
-        if (dot >= 0 && dot < builder.mainClass.length() - 1) {
-          appName = builder.mainClass.substring(dot + 1, 
builder.mainClass.length());
+    String loggerName = 
builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+    ProcessBuilder pb = createBuilder();
+    // Only setup stderr + stdout to logger redirection if user has not 
otherwise configured output
+    // redirection.
+    if (loggerName == null) {
+      String appName = 
builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+      if (appName == null) {
+        if (builder.appName != null) {
+          appName = builder.appName;
+        } else if (builder.mainClass != null) {
+          int dot = builder.mainClass.lastIndexOf(".");
+          if (dot >= 0 && dot < builder.mainClass.length() - 1) {
+            appName = builder.mainClass.substring(dot + 1, 
builder.mainClass.length());
+          } else {
+            appName = builder.mainClass;
+          }
+        } else if (builder.appResource != null) {
+          appName = new File(builder.appResource).getName();
         } else {
-          appName = builder.mainClass;
+          appName = String.valueOf(COUNTER.incrementAndGet());
         }
-      } else if (builder.appResource != null) {
-        appName = new File(builder.appResource).getName();
-      } else {
-        appName = String.valueOf(COUNTER.incrementAndGet());
       }
+      String loggerPrefix = getClass().getPackage().getName();
+      loggerName = String.format("%s.app.%s", loggerPrefix, appName);
+      pb.redirectErrorStream(true);
     }
 
-    String loggerPrefix = getClass().getPackage().getName();
-    String loggerName = String.format("%s.app.%s", loggerPrefix, appName);
-    ProcessBuilder pb = createBuilder().redirectErrorStream(true);
     pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
       String.valueOf(LauncherServer.getServerInstance().getPort()));
     pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, 
handle.getSecret());
@@ -455,6 +553,29 @@ public class SparkLauncher {
     for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
       pb.environment().put(e.getKey(), e.getValue());
     }
+
+    if (workingDir != null) {
+      pb.directory(workingDir);
+    }
+
+    // Only one of redirectError and redirectError(...) can be specified.
+    // Similarly, if redirectToLog is specified, no other redirections should 
be specified.
+    checkState(!redirectErrorStream || errorStream == null,
+      "Cannot specify both redirectError() and redirectError(...) ");
+    checkState(!redirectToLog ||
+      (!redirectErrorStream && errorStream == null && outputStream == null),
+      "Cannot used redirectToLog() in conjunction with other redirection 
methods.");
+
+    if (redirectErrorStream || redirectToLog) {
+      pb.redirectErrorStream(true);
+    }
+    if (errorStream != null) {
+      pb.redirectError(errorStream);
+    }
+    if (outputStream != null) {
+      pb.redirectOutput(outputStream);
+    }
+
     return pb;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to