Repository: incubator-twill Updated Branches: refs/heads/master a125b7e12 -> 6bf8d95a6
(TWILL-115) Added support for passing in Yarn schedule queue name when launching an application - Also refactor the internal ProcessLauncher class to simplify usage. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/6bf8d95a Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/6bf8d95a Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/6bf8d95a Branch: refs/heads/master Commit: 6bf8d95a661b2241c50cd4c644f13f26c7cc2659 Parents: a125b7e Author: Terence Yim <[email protected]> Authored: Wed Feb 11 01:02:22 2015 -0800 Committer: Terence Yim <[email protected]> Committed: Tue Feb 17 17:22:35 2015 -0800 ---------------------------------------------------------------------- .../org/apache/twill/api/TwillPreparer.java | 8 ++ .../apache/twill/internal/ProcessLauncher.java | 70 ++++------- .../twill/internal/TwillContainerLauncher.java | 43 ++----- .../internal/yarn/Hadoop20YarnAppClient.java | 14 ++- .../internal/yarn/Hadoop21YarnAppClient.java | 14 ++- .../yarn/AbstractYarnProcessLauncher.java | 124 ++++++------------- .../twill/internal/yarn/YarnAppClient.java | 17 ++- .../apache/twill/yarn/YarnTwillPreparer.java | 16 +-- 8 files changed, 125 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java index 6caad70..c7ae5e5 100644 --- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java +++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java @@ -45,6 +45,14 @@ public interface TwillPreparer { TwillPreparer setUser(String user); /** + * Sets the name of the scheduler queue to use. + * + * @param name Name of the scheduler queue + * @return This {@link TwillPreparer}. + */ + TwillPreparer setSchedulerQueue(String name); + + /** * This methods sets the extra JVM options that will be passed to the java command line for every runnable * of the application started through this {@link org.apache.twill.api.TwillPreparer} instance. * http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java index e48a226..098b90e 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java @@ -44,51 +44,29 @@ public interface ProcessLauncher<T> { */ interface PrepareLaunchContext { - ResourcesAdder withResources(); - - AfterResources noResources(); - - interface ResourcesAdder { - MoreResources add(LocalFile localFile); - } - - interface AfterResources { - EnvironmentAdder withEnvironment(); - - AfterEnvironment noEnvironment(); - } - - interface EnvironmentAdder { - <V> MoreEnvironment add(String key, V value); - } - - interface MoreEnvironment extends EnvironmentAdder, AfterEnvironment { - } - - interface AfterEnvironment { - CommandAdder withCommands(); - } - - interface MoreResources extends ResourcesAdder, AfterResources { } - - interface CommandAdder { - StdOutSetter add(String cmd, String...args); - } - - interface StdOutSetter { - StdErrSetter redirectOutput(String stdout); - - StdErrSetter noOutput(); - } - - interface StdErrSetter { - MoreCommand redirectError(String stderr); - - MoreCommand noError(); - } - - interface MoreCommand extends CommandAdder { - <R> ProcessController<R> launch(); - } + /** + * Adds list of files to be localized for the container + */ + PrepareLaunchContext addResources(LocalFile...localFiles); + + /** + * Adds list of files to be localized for the container. + */ + PrepareLaunchContext addResources(Iterable<LocalFile> localFiles); + + /** + * Adds a key value pair to the container environment. + */ + <V> PrepareLaunchContext addEnvironment(String key, V value); + + /** + * Adds a command line to run in the container process. + */ + PrepareLaunchContext addCommand(String cmd, String...args); + + /** + * Launches the container process. + */ + <R> ProcessController<R> launch(); } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 1688073..02606cd 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; import com.google.gson.JsonElement; -import org.apache.twill.api.LocalFile; import org.apache.twill.api.RunId; import org.apache.twill.api.RuntimeSpecification; import org.apache.twill.filesystem.Location; @@ -70,41 +69,25 @@ public final class TwillContainerLauncher { } public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath) { - ProcessLauncher.PrepareLaunchContext.AfterResources afterResources = null; - ProcessLauncher.PrepareLaunchContext.ResourcesAdder resourcesAdder = null; - // Clean up zookeeper path in case this is a retry and there are old messages and state there. Futures.getUnchecked(ZKOperations.ignoreError( ZKOperations.recursiveDelete(zkClient, "/" + runId), KeeperException.NoNodeException.class, null)); // Adds all file to be localized to container - if (!runtimeSpec.getLocalFiles().isEmpty()) { - resourcesAdder = launchContext.withResources(); - - for (LocalFile localFile : runtimeSpec.getLocalFiles()) { - afterResources = resourcesAdder.add(localFile); - } - } + launchContext.addResources(runtimeSpec.getLocalFiles()); // Optionally localize secure store. try { if (secureStoreLocation != null && secureStoreLocation.exists()) { - if (resourcesAdder == null) { - resourcesAdder = launchContext.withResources(); - } - afterResources = resourcesAdder.add(new DefaultLocalFile(Constants.Files.CREDENTIALS, - secureStoreLocation.toURI(), - secureStoreLocation.lastModified(), - secureStoreLocation.length(), false, null)); + launchContext.addResources(new DefaultLocalFile(Constants.Files.CREDENTIALS, + secureStoreLocation.toURI(), + secureStoreLocation.lastModified(), + secureStoreLocation.length(), false, null)); } } catch (IOException e) { LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation); } - if (afterResources == null) { - afterResources = launchContext.noResources(); - } - int memory = runtimeSpec.getResourceSpecification().getMemorySize(); if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) { // Reduce -Xmx by the reserved memory size. @@ -115,12 +98,11 @@ public final class TwillContainerLauncher { } // Currently no reporting is supported for runnable containers - ProcessLauncher.PrepareLaunchContext.MoreEnvironment afterEnvironment = afterResources - .withEnvironment() - .add(EnvKeys.TWILL_RUN_ID, runId.getId()) - .add(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName()) - .add(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId)) - .add(EnvKeys.TWILL_INSTANCE_COUNT, Integer.toString(instanceCount)); + launchContext + .addEnvironment(EnvKeys.TWILL_RUN_ID, runId.getId()) + .addEnvironment(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName()) + .addEnvironment(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId)) + .addEnvironment(EnvKeys.TWILL_INSTANCE_COUNT, Integer.toString(instanceCount)); // assemble the command based on jvm options ImmutableList.Builder<String> commandBuilder = ImmutableList.builder(); @@ -157,9 +139,8 @@ public final class TwillContainerLauncher { Boolean.TRUE.toString()); List<String> command = commandBuilder.build(); - ProcessController<Void> processController = afterEnvironment - .withCommands().add(firstCommand, command.toArray(new String[command.size()])) - .redirectOutput(Constants.STDOUT).redirectError(Constants.STDERR) + ProcessController<Void> processController = launchContext + .addCommand(firstCommand, command.toArray(new String[command.size()])) .launch(); TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java index 3f7422d..dfe4e67 100644 --- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java +++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.List; +import javax.annotation.Nullable; /** * @@ -64,7 +65,8 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception { + public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Request for new application final GetNewApplicationResponse response = yarnClient.getNewApplication(); final ApplicationId appId = response.getApplicationId(); @@ -75,6 +77,10 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements appSubmissionContext.setApplicationName(twillSpec.getName()); appSubmissionContext.setUser(user); + if (schedulerQueue != null) { + appSubmissionContext.setQueue(schedulerQueue); + } + ApplicationSubmitter submitter = new ApplicationSubmitter() { @Override @@ -147,9 +153,11 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception { + public ProcessLauncher<ApplicationId> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { this.user = user; - return createLauncher(twillSpec); + return createLauncher(twillSpec, schedulerQueue); } @Override http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java index 9b70a86..ac126ce 100644 --- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java +++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java @@ -43,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import javax.annotation.Nullable; /** * @@ -58,7 +59,8 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception { + public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Request for new application YarnClientApplication application = yarnClient.createApplication(); final GetNewApplicationResponse response = application.getNewApplicationResponse(); @@ -69,6 +71,10 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements appSubmissionContext.setApplicationId(appId); appSubmissionContext.setApplicationName(twillSpec.getName()); + if (schedulerQueue != null) { + appSubmissionContext.setQueue(schedulerQueue); + } + ApplicationSubmitter submitter = new ApplicationSubmitter() { @Override public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) { @@ -128,9 +134,11 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception { + public ProcessLauncher<ApplicationId> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Ignore user - return createLauncher(twillSpec); + return createLauncher(twillSpec, schedulerQueue); } @Override http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java index 1c28c47..ebbb559 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java @@ -30,6 +30,7 @@ import org.apache.twill.internal.utils.Paths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -113,108 +114,57 @@ public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher< } @Override - public ResourcesAdder withResources() { - return new MoreResourcesImpl(); + public PrepareLaunchContext addResources(LocalFile... localFiles) { + return addResources(Arrays.asList(localFiles)); } @Override - public AfterResources noResources() { - return new MoreResourcesImpl(); - } - - private final class MoreResourcesImpl implements MoreResources { - - @Override - public MoreResources add(LocalFile localFile) { + public PrepareLaunchContext addResources(Iterable<LocalFile> localFiles) { + for (LocalFile localFile : localFiles) { addLocalFile(localFile); - return this; - } - - @Override - public EnvironmentAdder withEnvironment() { - return finish(); - } - - @Override - public AfterEnvironment noEnvironment() { - return finish(); - } - - private MoreEnvironmentImpl finish() { - launchContext.setLocalResources(localResources); - return new MoreEnvironmentImpl(); } + return this; } - private final class MoreEnvironmentImpl implements MoreEnvironment { - - @Override - public CommandAdder withCommands() { - launchContext.setEnvironment(environment); - return new MoreCommandImpl(); - } - - @Override - public <V> MoreEnvironment add(String key, V value) { - environment.put(key, value.toString()); - return this; - } + @Override + public <V> PrepareLaunchContext addEnvironment(String key, V value) { + environment.put(key, value.toString()); + return this; } - private final class MoreCommandImpl implements MoreCommand, StdOutSetter, StdErrSetter { - - private final StringBuilder commandBuilder = new StringBuilder(); - - @Override - public StdOutSetter add(String cmd, String... args) { - commandBuilder.append(cmd); - for (String arg : args) { - commandBuilder.append(' ').append(arg); - } - return this; - } - - @Override - public <R> ProcessController<R> launch() { - if (credentials != null && !credentials.getAllTokens().isEmpty()) { - for (Token<?> token : credentials.getAllTokens()) { - LOG.info("Launch with delegation token {}", token); - } - launchContext.setCredentials(credentials); - } - launchContext.setCommands(commands); - return doLaunch(launchContext); + @Override + public PrepareLaunchContext addCommand(String cmd, String... args) { + StringBuilder builder = new StringBuilder(cmd); + for (String arg : args) { + builder.append(' ').append(arg); } - @Override - public MoreCommand redirectError(String stderr) { - redirect(2, stderr); - return noError(); - } + // Redirect stdout and stderr + redirect(1, ApplicationConstants.STDOUT, builder); + redirect(2, ApplicationConstants.STDERR, builder); - @Override - public MoreCommand noError() { - commands.add(commandBuilder.toString()); - commandBuilder.setLength(0); - return this; - } - - @Override - public StdErrSetter redirectOutput(String stdout) { - redirect(1, stdout); - return this; - } + commands.add(builder.toString()); + return this; + } - @Override - public StdErrSetter noOutput() { - return this; + @Override + public <R> ProcessController<R> launch() { + launchContext.setLocalResources(localResources); + launchContext.setEnvironment(environment); + if (credentials != null && !credentials.getAllTokens().isEmpty()) { + for (Token<?> token : credentials.getAllTokens()) { + LOG.info("Launch with delegation token {}", token); + } + launchContext.setCredentials(credentials); } + launchContext.setCommands(commands); + return doLaunch(launchContext); + } - private void redirect(int type, String out) { - commandBuilder.append(' ') - .append(type).append('>') - .append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out); - } + private void redirect(int type, String out, StringBuilder commandBuilder) { + commandBuilder.append(' ') + .append(type).append('>') + .append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out); } } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java index 3b03a2a..67a2292 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java @@ -25,6 +25,7 @@ import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; import java.util.List; +import javax.annotation.Nullable; /** * Interface for launching Yarn application from client. @@ -32,18 +33,26 @@ import java.util.List; public interface YarnAppClient extends Service { /** - * Creates a {@link ProcessLauncher} for launching the application represented by the given spec. + * Creates a {@link ProcessLauncher} for launching the application represented by the given spec. If scheduler queue + * is available and is supported by the YARN cluster, it will be launched in the given queue. */ - ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception; + ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception; /** - * Creates a {@link ProcessLauncher} for launching application with the given user and spec. + * Creates a {@link ProcessLauncher} for launching application with the given user and spec. If scheduler queue + * is available and is supported by the YARN cluster, it will be launched in the given queue. * * @deprecated This method will get removed. */ @Deprecated - ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception; + ProcessLauncher<ApplicationId> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception; + /** + * Creates a {@link ProcessController} that can controls an application represented by the given application id. + */ ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId); /** http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/6bf8d95a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index e78889a..ee889a8 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -124,6 +124,7 @@ final class YarnTwillPreparer implements TwillPreparer { private final Credentials credentials; private final int reservedMemory; private String user; + private String schedulerQueue; private String extraOptions; private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG; @@ -158,6 +159,12 @@ final class YarnTwillPreparer implements TwillPreparer { } @Override + public TwillPreparer setSchedulerQueue(String name) { + this.schedulerQueue = name; + return this; + } + + @Override public TwillPreparer setJVMOptions(String options) { this.extraOptions = options; return this; @@ -246,7 +253,7 @@ final class YarnTwillPreparer implements TwillPreparer { @Override public TwillController start() { try { - final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(user, twillSpec); + final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(user, twillSpec, schedulerQueue); final ApplicationId appId = launcher.getContainerInfo(); Callable<ProcessController<YarnApplicationReport>> submitTask = @@ -291,10 +298,7 @@ final class YarnTwillPreparer implements TwillPreparer { .put(EnvKeys.YARN_RM_SCHEDULER_ADDRESS, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS)) .build(), localFiles.values(), credentials - ) - .noResources() - .noEnvironment() - .withCommands().add( + ).addCommand( "$JAVA_HOME/bin/java", "-Djava.io.tmpdir=tmp", "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR, @@ -306,8 +310,6 @@ final class YarnTwillPreparer implements TwillPreparer { Constants.Files.APP_MASTER_JAR, ApplicationMasterMain.class.getName(), Boolean.FALSE.toString()) - .redirectOutput(Constants.STDOUT) - .redirectError(Constants.STDERR) .launch(); } };
