(TWILL-115) Added support for passing in Yarn schedule queue name when 
launching an application

- Also refactor the internal ProcessLauncher class to simplify usage.

Signed-off-by: Terence Yim <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/6bf8d95a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/6bf8d95a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/6bf8d95a

Branch: refs/heads/site
Commit: 6bf8d95a661b2241c50cd4c644f13f26c7cc2659
Parents: a125b7e
Author: Terence Yim <[email protected]>
Authored: Wed Feb 11 01:02:22 2015 -0800
Committer: Terence Yim <[email protected]>
Committed: Tue Feb 17 17:22:35 2015 -0800

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillPreparer.java     |   8 ++
 .../apache/twill/internal/ProcessLauncher.java  |  70 ++++-------
 .../twill/internal/TwillContainerLauncher.java  |  43 ++-----
 .../internal/yarn/Hadoop20YarnAppClient.java    |  14 ++-
 .../internal/yarn/Hadoop21YarnAppClient.java    |  14 ++-
 .../yarn/AbstractYarnProcessLauncher.java       | 124 ++++++-------------
 .../twill/internal/yarn/YarnAppClient.java      |  17 ++-
 .../apache/twill/yarn/YarnTwillPreparer.java    |  16 +--
 8 files changed, 125 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java 
b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index 6caad70..c7ae5e5 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -45,6 +45,14 @@ public interface TwillPreparer {
   TwillPreparer setUser(String user);
 
   /**
+   * Sets the name of the scheduler queue to use.
+   *
+   * @param name Name of the scheduler queue
+   * @return This {@link TwillPreparer}.
+   */
+  TwillPreparer setSchedulerQueue(String name);
+
+  /**
    * This methods sets the extra JVM options that will be passed to the java 
command line for every runnable
    * of the application started through this {@link 
org.apache.twill.api.TwillPreparer} instance.
    *

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java 
b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
index e48a226..098b90e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
@@ -44,51 +44,29 @@ public interface ProcessLauncher<T> {
    */
   interface PrepareLaunchContext {
 
-    ResourcesAdder withResources();
-
-    AfterResources noResources();
-
-    interface ResourcesAdder {
-      MoreResources add(LocalFile localFile);
-    }
-
-    interface AfterResources {
-      EnvironmentAdder withEnvironment();
-
-      AfterEnvironment noEnvironment();
-    }
-
-    interface EnvironmentAdder {
-      <V> MoreEnvironment add(String key, V value);
-    }
-
-    interface MoreEnvironment extends EnvironmentAdder, AfterEnvironment {
-    }
-
-    interface AfterEnvironment {
-      CommandAdder withCommands();
-    }
-
-    interface MoreResources extends ResourcesAdder, AfterResources { }
-
-    interface CommandAdder {
-      StdOutSetter add(String cmd, String...args);
-    }
-
-    interface StdOutSetter {
-      StdErrSetter redirectOutput(String stdout);
-
-      StdErrSetter noOutput();
-    }
-
-    interface StdErrSetter {
-      MoreCommand redirectError(String stderr);
-
-      MoreCommand noError();
-    }
-
-    interface MoreCommand extends CommandAdder {
-      <R> ProcessController<R> launch();
-    }
+    /**
+     * Adds list of files to be localized for the container
+     */
+    PrepareLaunchContext addResources(LocalFile...localFiles);
+
+    /**
+     * Adds list of files to be localized for the container.
+     */
+    PrepareLaunchContext addResources(Iterable<LocalFile> localFiles);
+
+    /**
+     * Adds a key value pair to the container environment.
+     */
+    <V> PrepareLaunchContext addEnvironment(String key, V value);
+
+    /**
+     * Adds a command line to run in the container process.
+     */
+    PrepareLaunchContext addCommand(String cmd, String...args);
+
+    /**
+     * Launches the container process.
+     */
+    <R> ProcessController<R> launch();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 1688073..02606cd 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
-import org.apache.twill.api.LocalFile;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.filesystem.Location;
@@ -70,41 +69,25 @@ public final class TwillContainerLauncher {
   }
 
   public TwillContainerController start(RunId runId, int instanceId, Class<?> 
mainClass, String classPath) {
-    ProcessLauncher.PrepareLaunchContext.AfterResources afterResources = null;
-    ProcessLauncher.PrepareLaunchContext.ResourcesAdder resourcesAdder = null;
-
     // Clean up zookeeper path in case this is a retry and there are old 
messages and state there.
     Futures.getUnchecked(ZKOperations.ignoreError(
       ZKOperations.recursiveDelete(zkClient, "/" + runId), 
KeeperException.NoNodeException.class, null));
 
     // Adds all file to be localized to container
-    if (!runtimeSpec.getLocalFiles().isEmpty()) {
-      resourcesAdder = launchContext.withResources();
-
-      for (LocalFile localFile : runtimeSpec.getLocalFiles()) {
-        afterResources = resourcesAdder.add(localFile);
-      }
-    }
+    launchContext.addResources(runtimeSpec.getLocalFiles());
 
     // Optionally localize secure store.
     try {
       if (secureStoreLocation != null && secureStoreLocation.exists()) {
-        if (resourcesAdder == null) {
-          resourcesAdder = launchContext.withResources();
-        }
-        afterResources = resourcesAdder.add(new 
DefaultLocalFile(Constants.Files.CREDENTIALS,
-                                                                 
secureStoreLocation.toURI(),
-                                                                 
secureStoreLocation.lastModified(),
-                                                                 
secureStoreLocation.length(), false, null));
+        launchContext.addResources(new 
DefaultLocalFile(Constants.Files.CREDENTIALS,
+                                                        
secureStoreLocation.toURI(),
+                                                        
secureStoreLocation.lastModified(),
+                                                        
secureStoreLocation.length(), false, null));
       }
     } catch (IOException e) {
       LOG.warn("Failed to launch container with secure store {}.", 
secureStoreLocation);
     }
 
-    if (afterResources == null) {
-      afterResources = launchContext.noResources();
-    }
-
     int memory = runtimeSpec.getResourceSpecification().getMemorySize();
     if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) {
       // Reduce -Xmx by the reserved memory size.
@@ -115,12 +98,11 @@ public final class TwillContainerLauncher {
     }
 
     // Currently no reporting is supported for runnable containers
-    ProcessLauncher.PrepareLaunchContext.MoreEnvironment afterEnvironment = 
afterResources
-      .withEnvironment()
-      .add(EnvKeys.TWILL_RUN_ID, runId.getId())
-      .add(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName())
-      .add(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId))
-      .add(EnvKeys.TWILL_INSTANCE_COUNT, Integer.toString(instanceCount));
+    launchContext
+      .addEnvironment(EnvKeys.TWILL_RUN_ID, runId.getId())
+      .addEnvironment(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName())
+      .addEnvironment(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId))
+      .addEnvironment(EnvKeys.TWILL_INSTANCE_COUNT, 
Integer.toString(instanceCount));
 
     // assemble the command based on jvm options
     ImmutableList.Builder<String> commandBuilder = ImmutableList.builder();
@@ -157,9 +139,8 @@ public final class TwillContainerLauncher {
                        Boolean.TRUE.toString());
     List<String> command = commandBuilder.build();
 
-    ProcessController<Void> processController = afterEnvironment
-      .withCommands().add(firstCommand, command.toArray(new 
String[command.size()]))
-      .redirectOutput(Constants.STDOUT).redirectError(Constants.STDERR)
+    ProcessController<Void> processController = launchContext
+      .addCommand(firstCommand, command.toArray(new String[command.size()]))
       .launch();
 
     TwillContainerControllerImpl controller = new 
TwillContainerControllerImpl(zkClient, runId, processController);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
index 3f7422d..dfe4e67 100644
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.List;
+import javax.annotation.Nullable;
 
 /**
  *
@@ -64,7 +65,8 @@ public final class Hadoop20YarnAppClient extends 
AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification 
twillSpec) throws Exception {
+  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification 
twillSpec,
+                                                       @Nullable String 
schedulerQueue) throws Exception {
     // Request for new application
     final GetNewApplicationResponse response = yarnClient.getNewApplication();
     final ApplicationId appId = response.getApplicationId();
@@ -75,6 +77,10 @@ public final class Hadoop20YarnAppClient extends 
AbstractIdleService implements
     appSubmissionContext.setApplicationName(twillSpec.getName());
     appSubmissionContext.setUser(user);
 
+    if (schedulerQueue != null) {
+      appSubmissionContext.setQueue(schedulerQueue);
+    }
+
     ApplicationSubmitter submitter = new ApplicationSubmitter() {
 
       @Override
@@ -147,9 +153,11 @@ public final class Hadoop20YarnAppClient extends 
AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(String user, 
TwillSpecification twillSpec) throws Exception {
+  public ProcessLauncher<ApplicationId> createLauncher(String user,
+                                                       TwillSpecification 
twillSpec,
+                                                       @Nullable String 
schedulerQueue) throws Exception {
     this.user = user;
-    return createLauncher(twillSpec);
+    return createLauncher(twillSpec, schedulerQueue);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
 
b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index 9b70a86..ac126ce 100644
--- 
a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ 
b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import javax.annotation.Nullable;
 
 /**
  *
@@ -58,7 +59,8 @@ public final class Hadoop21YarnAppClient extends 
AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification 
twillSpec) throws Exception {
+  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification 
twillSpec,
+                                                       @Nullable String 
schedulerQueue) throws Exception {
     // Request for new application
     YarnClientApplication application = yarnClient.createApplication();
     final GetNewApplicationResponse response = 
application.getNewApplicationResponse();
@@ -69,6 +71,10 @@ public final class Hadoop21YarnAppClient extends 
AbstractIdleService implements
     appSubmissionContext.setApplicationId(appId);
     appSubmissionContext.setApplicationName(twillSpec.getName());
 
+    if (schedulerQueue != null) {
+      appSubmissionContext.setQueue(schedulerQueue);
+    }
+
     ApplicationSubmitter submitter = new ApplicationSubmitter() {
       @Override
       public ProcessController<YarnApplicationReport> submit(YarnLaunchContext 
context, Resource capability) {
@@ -128,9 +134,11 @@ public final class Hadoop21YarnAppClient extends 
AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(String user, 
TwillSpecification twillSpec) throws Exception {
+  public ProcessLauncher<ApplicationId> createLauncher(String user,
+                                                       TwillSpecification 
twillSpec,
+                                                       @Nullable String 
schedulerQueue) throws Exception {
     // Ignore user
-    return createLauncher(twillSpec);
+    return createLauncher(twillSpec, schedulerQueue);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
index 1c28c47..ebbb559 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
@@ -30,6 +30,7 @@ import org.apache.twill.internal.utils.Paths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -113,108 +114,57 @@ public abstract class AbstractYarnProcessLauncher<T> 
implements ProcessLauncher<
     }
 
     @Override
-    public ResourcesAdder withResources() {
-      return new MoreResourcesImpl();
+    public PrepareLaunchContext addResources(LocalFile... localFiles) {
+      return addResources(Arrays.asList(localFiles));
     }
 
     @Override
-    public AfterResources noResources() {
-      return new MoreResourcesImpl();
-    }
-
-    private final class MoreResourcesImpl implements MoreResources {
-
-      @Override
-      public MoreResources add(LocalFile localFile) {
+    public PrepareLaunchContext addResources(Iterable<LocalFile> localFiles) {
+      for (LocalFile localFile : localFiles) {
         addLocalFile(localFile);
-        return this;
-      }
-
-      @Override
-      public EnvironmentAdder withEnvironment() {
-        return finish();
-      }
-
-      @Override
-      public AfterEnvironment noEnvironment() {
-        return finish();
-      }
-
-      private MoreEnvironmentImpl finish() {
-        launchContext.setLocalResources(localResources);
-        return new MoreEnvironmentImpl();
       }
+      return this;
     }
 
-    private final class MoreEnvironmentImpl implements MoreEnvironment {
-
-      @Override
-      public CommandAdder withCommands() {
-        launchContext.setEnvironment(environment);
-        return new MoreCommandImpl();
-      }
-
-      @Override
-      public <V> MoreEnvironment add(String key, V value) {
-        environment.put(key, value.toString());
-        return this;
-      }
+    @Override
+    public <V> PrepareLaunchContext addEnvironment(String key, V value) {
+      environment.put(key, value.toString());
+      return this;
     }
 
-    private final class MoreCommandImpl implements MoreCommand, StdOutSetter, 
StdErrSetter {
-
-      private final StringBuilder commandBuilder = new StringBuilder();
-
-      @Override
-      public StdOutSetter add(String cmd, String... args) {
-        commandBuilder.append(cmd);
-        for (String arg : args) {
-          commandBuilder.append(' ').append(arg);
-        }
-        return this;
-      }
-
-      @Override
-      public <R> ProcessController<R> launch() {
-        if (credentials != null && !credentials.getAllTokens().isEmpty()) {
-          for (Token<?> token : credentials.getAllTokens()) {
-            LOG.info("Launch with delegation token {}", token);
-          }
-          launchContext.setCredentials(credentials);
-        }
-        launchContext.setCommands(commands);
-        return doLaunch(launchContext);
+    @Override
+    public PrepareLaunchContext addCommand(String cmd, String... args) {
+      StringBuilder builder = new StringBuilder(cmd);
+      for (String arg : args) {
+        builder.append(' ').append(arg);
       }
 
-      @Override
-      public MoreCommand redirectError(String stderr) {
-        redirect(2, stderr);
-        return noError();
-      }
+      // Redirect stdout and stderr
+      redirect(1, ApplicationConstants.STDOUT, builder);
+      redirect(2, ApplicationConstants.STDERR, builder);
 
-      @Override
-      public MoreCommand noError() {
-        commands.add(commandBuilder.toString());
-        commandBuilder.setLength(0);
-        return this;
-      }
-
-      @Override
-      public StdErrSetter redirectOutput(String stdout) {
-        redirect(1, stdout);
-        return this;
-      }
+      commands.add(builder.toString());
+      return this;
+    }
 
-      @Override
-      public StdErrSetter noOutput() {
-        return this;
+    @Override
+    public <R> ProcessController<R> launch() {
+      launchContext.setLocalResources(localResources);
+      launchContext.setEnvironment(environment);
+      if (credentials != null && !credentials.getAllTokens().isEmpty()) {
+        for (Token<?> token : credentials.getAllTokens()) {
+          LOG.info("Launch with delegation token {}", token);
+        }
+        launchContext.setCredentials(credentials);
       }
+      launchContext.setCommands(commands);
+      return doLaunch(launchContext);
+    }
 
-      private void redirect(int type, String out) {
-        commandBuilder.append(' ')
-                      .append(type).append('>')
-                      
.append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out);
-      }
+    private void redirect(int type, String out, StringBuilder commandBuilder) {
+      commandBuilder.append(' ')
+        .append(type).append('>')
+        
.append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
index 3b03a2a..67a2292 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -25,6 +25,7 @@ import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
 
 import java.util.List;
+import javax.annotation.Nullable;
 
 /**
  * Interface for launching Yarn application from client.
@@ -32,18 +33,26 @@ import java.util.List;
 public interface YarnAppClient extends Service {
 
   /**
-   * Creates a {@link ProcessLauncher} for launching the application 
represented by the given spec.
+   * Creates a {@link ProcessLauncher} for launching the application 
represented by the given spec. If scheduler queue
+   * is available and is supported by the YARN cluster, it will be launched in 
the given queue.
    */
-  ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) 
throws Exception;
+  ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec,
+                                                @Nullable String 
schedulerQueue) throws Exception;
 
   /**
-   * Creates a {@link ProcessLauncher} for launching application with the 
given user and spec.
+   * Creates a {@link ProcessLauncher} for launching application with the 
given user and spec. If scheduler queue
+   * is available and is supported by the YARN cluster, it will be launched in 
the given queue.
    *
    * @deprecated This method will get removed.
    */
   @Deprecated
-  ProcessLauncher<ApplicationId> createLauncher(String user, 
TwillSpecification twillSpec) throws Exception;
+  ProcessLauncher<ApplicationId> createLauncher(String user,
+                                                TwillSpecification twillSpec,
+                                                @Nullable String 
schedulerQueue) throws Exception;
 
+  /**
+   * Creates a {@link ProcessController} that can controls an application 
represented by the given application id.
+   */
   ProcessController<YarnApplicationReport> 
createProcessController(ApplicationId appId);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java 
b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index e78889a..ee889a8 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -124,6 +124,7 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final Credentials credentials;
   private final int reservedMemory;
   private String user;
+  private String schedulerQueue;
   private String extraOptions;
   private JvmOptions.DebugOptions debugOptions = 
JvmOptions.DebugOptions.NO_DEBUG;
 
@@ -158,6 +159,12 @@ final class YarnTwillPreparer implements TwillPreparer {
   }
 
   @Override
+  public TwillPreparer setSchedulerQueue(String name) {
+    this.schedulerQueue = name;
+    return this;
+  }
+
+  @Override
   public TwillPreparer setJVMOptions(String options) {
     this.extraOptions = options;
     return this;
@@ -246,7 +253,7 @@ final class YarnTwillPreparer implements TwillPreparer {
   @Override
   public TwillController start() {
     try {
-      final ProcessLauncher<ApplicationId> launcher = 
yarnAppClient.createLauncher(user, twillSpec);
+      final ProcessLauncher<ApplicationId> launcher = 
yarnAppClient.createLauncher(user, twillSpec, schedulerQueue);
       final ApplicationId appId = launcher.getContainerInfo();
 
       Callable<ProcessController<YarnApplicationReport>> submitTask =
@@ -291,10 +298,7 @@ final class YarnTwillPreparer implements TwillPreparer {
                         .put(EnvKeys.YARN_RM_SCHEDULER_ADDRESS, 
yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS))
                         .build(),
             localFiles.values(), credentials
-          )
-            .noResources()
-            .noEnvironment()
-            .withCommands().add(
+          ).addCommand(
               "$JAVA_HOME/bin/java",
               "-Djava.io.tmpdir=tmp",
               "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
@@ -306,8 +310,6 @@ final class YarnTwillPreparer implements TwillPreparer {
               Constants.Files.APP_MASTER_JAR,
               ApplicationMasterMain.class.getName(),
               Boolean.FALSE.toString())
-            .redirectOutput(Constants.STDOUT)
-            .redirectError(Constants.STDERR)
             .launch();
         }
       };

Reply via email to