Repository: incubator-reef Updated Branches: refs/heads/master f632869a5 -> ac1555b19
[REEF-454] Allow for per-job queues This adds `YarnDriverConfiguration` that can be filled out by an application and merged into their DriverConfiguration to set the queue to submit the job to. This also makes sure the `YarnClientConfiguration.YARN_QUEUE_NAME` is actually used (it wasn't). It now provides the default for when the new per-job setting isn't used. Lastly, this change deprecates `JobSubmissionEvent.getQueue()` which was never used in the first place. Its purpose is now served by the mechanism introduced here. JIRA: [REEF-454](https://issues.apache.org/jira/browse/REEF-454) Pull Request: This closes #279 Author: Markus Weimer <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ac1555b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ac1555b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ac1555b1 Branch: refs/heads/master Commit: ac1555b19c715f3ebb3cb6c59561b2df1ff22a4a Parents: f632869 Author: Markus Weimer <[email protected]> Authored: Mon Jul 6 17:44:59 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Tue Jul 14 00:06:02 2015 -0700 ---------------------------------------------------------------------- .../common/client/api/JobSubmissionEvent.java | 2 + .../client/api/JobSubmissionEventImpl.java | 2 + .../yarn/client/YarnDriverConfiguration.java | 46 ++++++++++++++++++++ .../yarn/client/YarnJobSubmissionHandler.java | 35 +++++++++------ 4 files changed, 72 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java index 76f9877..7bf3123 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java @@ -76,6 +76,8 @@ public interface JobSubmissionEvent { /** * @return Queue to submit the Job to + * @deprecated in 0.12. Use org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead. */ + @Deprecated Optional<String> getQueue(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java index d63f881..3a6e9dd 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java @@ -184,7 +184,9 @@ public final class JobSubmissionEventImpl implements JobSubmissionEvent { /** * @see JobSubmissionEvent#getQueue() + * @deprecated in 0.12. Use org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead. */ + @Deprecated public Builder setQueue(final String queue) { this.queue = queue; return this; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java new file mode 100644 index 0000000..9975f36 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client; + +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.runtime.yarn.client.parameters.JobQueue; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.OptionalParameter; + +/** + * Additional YARN-Specific configuration options to be merged with DriverConfiguration. + */ +@Public +@ClientSide +public final class YarnDriverConfiguration extends ConfigurationModuleBuilder { + + /** + * The queue to submit this Driver to. + */ + public static final OptionalParameter<String> QUEUE = new OptionalParameter<>(); + + /** + * ConfigurationModule to set YARN-Specific configuration options to be merged with DriverConfiguration. + */ + public static final ConfigurationModule CONF = new YarnDriverConfiguration() + .bindNamedParameter(JobQueue.class, QUEUE) + .build(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java index 0eb7246..a4e3412 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java @@ -31,6 +31,7 @@ import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.JobJarMaker; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.runtime.yarn.client.parameters.JobQueue; import org.apache.reef.runtime.yarn.client.uploader.JobFolder; import org.apache.reef.runtime.yarn.client.uploader.JobUploader; import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; @@ -39,7 +40,6 @@ import org.apache.reef.tang.Configurations; import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.ConfigurationSerializer; import org.apache.reef.util.Optional; import javax.inject.Inject; @@ -48,8 +48,6 @@ import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; -import static org.apache.reef.util.Optional.*; - @Private @ClientSide final class YarnJobSubmissionHandler implements JobSubmissionHandler { @@ -60,9 +58,9 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { private final JobJarMaker jobJarMaker; private final REEFFileNames fileNames; private final ClasspathProvider classpath; - private final ConfigurationSerializer configurationSerializer; private final JobUploader uploader; private final double jvmSlack; + private final String defaultQueueName; @Inject YarnJobSubmissionHandler( @@ -70,17 +68,17 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { final JobJarMaker jobJarMaker, final REEFFileNames fileNames, final ClasspathProvider classpath, - final ConfigurationSerializer configurationSerializer, final JobUploader uploader, - @Parameter(JVMHeapSlack.class) final double jvmSlack) throws IOException { + @Parameter(JVMHeapSlack.class) final double jvmSlack, + @Parameter(JobQueue.class) String defaultQueueName) throws IOException { this.yarnConfiguration = yarnConfiguration; this.jobJarMaker = jobJarMaker; this.fileNames = fileNames; this.classpath = classpath; - this.configurationSerializer = configurationSerializer; this.uploader = uploader; this.jvmSlack = jvmSlack; + this.defaultQueueName = defaultQueueName; } @Override @@ -110,7 +108,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { .setApplicationName(jobSubmissionEvent.getIdentifier()) .setDriverMemory(jobSubmissionEvent.getDriverMemory().get()) .setPriority(getPriority(jobSubmissionEvent)) - .setQueue(getQueue(jobSubmissionEvent, "default")) + .setQueue(getQueue(jobSubmissionEvent)) .submit(jobSubmissionEvent.getRemoteId()); LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionEvent.getIdentifier()); @@ -141,12 +139,23 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { /** * Extract the queue name from the jobSubmissionEvent or return default if none is set. - * <p/> - * TODO: Revisit this. We also have a named parameter for the queue in YarnClientConfiguration. */ - private String getQueue(final JobSubmissionEvent jobSubmissionEvent, - final String defaultQueue) { - return jobSubmissionEvent.getQueue().orElse(defaultQueue); + private String getQueue(final JobSubmissionEvent jobSubmissionEvent) { + return getQueue(jobSubmissionEvent.getConfiguration()); + } + + /** + * Extracts the queue name from the driverConfiguration or return default if none is set. + * + * @param driverConfiguration + * @return the queue name from the driverConfiguration or return default if none is set. + */ + private String getQueue(final Configuration driverConfiguration) { + try { + return Tang.Factory.getTang().newInjector(driverConfiguration).getNamedInstance(JobQueue.class); + } catch (final Throwable t) { + return this.defaultQueueName; + } } private static Optional<String> getUserBoundJobSubmissionDirectory(final Configuration configuration) {
