Repository: incubator-reef Updated Branches: refs/heads/master b6b9e39bb -> 3aaca1841
[REEF-456] Make job submission folder prefix configurable This addressed the issue by adding a `JobSubmissionDirectoryPrefix` named parameter and propogating it to driver. JIRA: [REEF-456](https://issues.apache.org/jira/browse/REEF-456) Pull Request: This closes #284 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/3aaca184 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/3aaca184 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/3aaca184 Branch: refs/heads/master Commit: 3aaca18417ca2553313dcd0a85516dbd09b58d72 Parents: b6b9e39 Author: Beysim Sezgin <[email protected]> Authored: Wed Jul 8 09:47:29 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu Jul 9 12:19:30 2015 -0700 ---------------------------------------------------------------------- .../resourcemanager/ResourceManagerStatus.java | 2 +- .../yarn/client/YarnJobSubmissionHandler.java | 32 ++++++++---- .../yarn/client/uploader/JobUploader.java | 37 ++++++++++---- .../driver/JobSubmissionDirectoryProvider.java | 36 +++++++++++++ .../JobSubmissionDirectoryProviderImpl.java | 54 ++++++++++++++++++++ .../JobSubmissionDirectoryPrefix.java | 29 +++++++++++ 6 files changed, 168 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java index 24c8e2f..b10262a 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java @@ -66,7 +66,7 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) { final ReefServiceProtos.State newState = runtimeStatusEvent.getState(); LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent); - this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().get(); + this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().orElse(0); this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size(); this.setState(runtimeStatusEvent.getState()); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java index eaa960d..5bd17ac 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java @@ -36,10 +36,11 @@ import org.apache.reef.runtime.yarn.client.uploader.JobUploader; import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.tang.formats.ConfigurationSerializer; -import org.apache.reef.tang.types.NamedParameterNode; -import org.apache.reef.tang.util.ReflectionUtilities; +import org.apache.reef.util.Optional; import javax.inject.Inject; import java.io.File; @@ -47,6 +48,8 @@ import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; +import static org.apache.reef.util.Optional.*; + @Private @ClientSide final class YarnJobSubmissionHandler implements JobSubmissionHandler { @@ -93,7 +96,10 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, this.classpath)) { LOG.log(Level.FINE, "Assembling submission JAR for the Driver."); - final JobFolder jobFolderOnDfs = this.uploader.createJobFolder(submissionHelper.getApplicationId()); + final Optional<String> userBoundJobSubmissionDirectory = getUserBoundJobSubmissionDirectory(jobSubmissionEvent.getConfiguration()); + final JobFolder jobFolderOnDfs = userBoundJobSubmissionDirectory.isPresent() + ? this.uploader.createJobFolder(userBoundJobSubmissionDirectory.get()) + : this.uploader.createJobFolder(submissionHelper.getApplicationId()); final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, jobFolderOnDfs.getPath()); final File jobSubmissionFile = this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration); final LocalResource driverJarOnDfs = jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile); @@ -118,20 +124,14 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { private Configuration makeDriverConfiguration( final JobSubmissionEvent jobSubmissionEvent, final Path jobFolderPath) throws IOException { - final Configuration config = jobSubmissionEvent.getConfiguration(); - final String userBoundJobSubmissionDirectory = config.getNamedParameter((NamedParameterNode<?>) config.getClassHierarchy().getNode(ReflectionUtilities.getFullName(DriverJobSubmissionDirectory.class))); - LOG.log(Level.FINE, "user bound job submission Directory: " + userBoundJobSubmissionDirectory); - final String finalJobFolderPath = - (userBoundJobSubmissionDirectory == null || userBoundJobSubmissionDirectory.isEmpty()) - ? jobFolderPath.toString() : userBoundJobSubmissionDirectory; return Configurations.merge( YarnDriverConfiguration.CONF - .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, finalJobFolderPath) + .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobFolderPath.toString()) .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionEvent.getIdentifier()) .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionEvent.getRemoteId()) .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack) .build(), - config); + jobSubmissionEvent.getConfiguration()); } private static int getPriority(final JobSubmissionEvent jobSubmissionEvent) { @@ -147,4 +147,14 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { final String defaultQueue) { return jobSubmissionEvent.getQueue().orElse(defaultQueue); } + + private static org.apache.reef.util.Optional<String> getUserBoundJobSubmissionDirectory(final Configuration configuration) { + try { + return Optional.ofNullable(Tang.Factory.getTang().newInjector(configuration).getNamedInstance(DriverJobSubmissionDirectory.class)); + } catch (InjectionException ex) { + return Optional.empty(); + } + + } + } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java index 417c069..98edbf5 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java @@ -21,23 +21,26 @@ package org.apache.reef.runtime.yarn.client.uploader; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.reef.runtime.common.files.REEFFileNames; - +import org.apache.reef.runtime.yarn.driver.JobSubmissionDirectoryProvider; import javax.inject.Inject; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Helper class to upload the driver files to HDFS. */ public final class JobUploader { + private static final Logger LOG = Logger.getLogger(JobUploader.class.getName()); + private final FileSystem fileSystem; - private final REEFFileNames fileNames; + private final JobSubmissionDirectoryProvider jobSubmissionDirectoryProvider; @Inject JobUploader(final YarnConfiguration yarnConfiguration, - final REEFFileNames fileNames) throws IOException { - this.fileNames = fileNames; + JobSubmissionDirectoryProvider jobSubmissionDirectoryProvider) throws IOException { + this.jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider; this.fileSystem = FileSystem.get(yarnConfiguration); } @@ -48,10 +51,24 @@ public final class JobUploader { * @return a reference to the JobFolder that can be used to upload files to it. * @throws IOException */ - public JobFolder createJobFolder(final String applicationId) throws IOException { - // TODO: This really should be configurable, but wasn't in the code I moved as part of [REEF-228] - final Path jobFolderPath = new Path("/tmp/" + this.fileNames.getJobFolderPrefix() + applicationId + "/"); - return new JobFolder(this.fileSystem, jobFolderPath); + public JobFolder createJobFolderWithApplicationId(final String applicationId) throws IOException { + final Path jobFolderPath = jobSubmissionDirectoryProvider.getJobSubmissionDirectoryPath(applicationId); + final String finalJobFolderPath = jobFolderPath.toString(); + LOG.log(Level.FINE, "Final job submission Directory: " + finalJobFolderPath); + return createJobFolder(finalJobFolderPath); + } + + + /** + * Convenience override for int ids. + * + * @param finalJobFolderPath + * @return + * @throws IOException + */ + public JobFolder createJobFolder(final String finalJobFolderPath) throws IOException { + LOG.log(Level.FINE, "Final job submission Directory: " + finalJobFolderPath); + return new JobFolder(this.fileSystem, new Path(finalJobFolderPath)); } /** @@ -62,6 +79,6 @@ public final class JobUploader { * @throws IOException */ public JobFolder createJobFolder(final int applicationId) throws IOException { - return this.createJobFolder(Integer.toString(applicationId)); + return this.createJobFolderWithApplicationId(Integer.toString(applicationId)); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProvider.java new file mode 100644 index 0000000..35d0048 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.driver; + +import org.apache.hadoop.fs.Path; +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * Provides path to job submission directory. + * + */ +@DefaultImplementation(JobSubmissionDirectoryProviderImpl.class) +public interface JobSubmissionDirectoryProvider { + /** + * Returns path to job submission directory. + * + * @return job submission directory + */ + Path getJobSubmissionDirectoryPath(String applicationId); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProviderImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProviderImpl.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProviderImpl.java new file mode 100644 index 0000000..0222ddb --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProviderImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.driver; + +import org.apache.hadoop.fs.Path; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.text.SimpleDateFormat; +import java.util.Calendar; + +public final class JobSubmissionDirectoryProviderImpl implements JobSubmissionDirectoryProvider { + + /** + * The path on (H)DFS which is used as the job's folder. + */ + private final String jobSubmissionDirectory; + private final REEFFileNames fileNames; + + @Inject + JobSubmissionDirectoryProviderImpl(@Parameter(JobSubmissionDirectoryPrefix.class) final String jobSubmissionDirectoryPrefix, + final REEFFileNames fileNames) { + this.jobSubmissionDirectory = jobSubmissionDirectoryPrefix; + this.fileNames = fileNames; + } + + @Override + public Path getJobSubmissionDirectoryPath(String applicationId) { + return new Path(this.jobSubmissionDirectory + + "/" + + new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_").format(Calendar.getInstance().getTime()) + + this.fileNames.getJobFolderPrefix() + + applicationId + + "/"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectoryPrefix.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectoryPrefix.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectoryPrefix.java new file mode 100644 index 0000000..30f1741 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectoryPrefix.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.driver.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The job submission directory. + */ +@NamedParameter(doc = "The job submission directory prefix.", default_value = "/vol1/tmp") +public final class JobSubmissionDirectoryPrefix implements Name<String> { +}
