Repository: incubator-reef
Updated Branches:
refs/heads/master 20ee916d8 -> 4f5bb42c5
[REEF-490] Expose configuration to preserve containers across application on
YARN
This addressed the issue by
* Adding ResourceManagerPreserveEvaluators to
DriverRuntimeRestartConfiguration.
* Adding MaxApplicationSubmissions to DriverConfiguration.
* Adding PreserveEvaluators and MaxApplicationSubmissions to
JobSubmissionEvent.
* Adding command line arguments from .NET to support MaxApplicationSubmissions
and ResourceManagerPreserveEvaluators.
JIRA:
[REEF-490](https://issues.apache.org/jira/browse/REEF-490)
Pull Request:
This closes #347
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4f5bb42c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4f5bb42c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4f5bb42c
Branch: refs/heads/master
Commit: 4f5bb42c511d43c63644e6962982175023fc2b2f
Parents: 20ee916
Author: Andrew Chung <[email protected]>
Authored: Thu Aug 6 15:45:59 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 10 16:04:57 2015 -0700
----------------------------------------------------------------------
.../ClrClient2JavaClientCuratedParameters.cs | 39 +++++-
.../Org.Apache.REEF.Client/YARN/YARNClient.cs | 4 +-
.../Bridge/DriverBridgeConfigurationOptions.cs | 5 +
.../DriverConfiguration.cs | 8 ++
.../bridge/client/YarnJobSubmissionClient.java | 120 ++++++++++++++-----
.../apache/reef/client/DriverConfiguration.java | 8 +-
.../parameters/MaxApplicationSubmissions.java | 32 +++++
.../ResourceManagerPreserveEvaluators.java | 32 +++++
.../common/client/JobSubmissionHelper.java | 6 +-
.../common/client/api/JobSubmissionEvent.java | 11 ++
.../client/api/JobSubmissionEventImpl.java | 32 +++++
.../DriverRuntimeRestartConfiguration.java | 5 +
.../yarn/client/YarnJobSubmissionHandler.java | 18 ++-
.../yarn/client/YarnSubmissionHelper.java | 68 ++++++++---
.../reef/runtime/yarn/util/YarnTypes.java | 3 +
15 files changed, 338 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
index e40ab59..473e223 100644
---
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
+++
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
@@ -16,17 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
+
+using System;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Client.Common
{
+ /// <summary>
+ /// Curated parameters for CLR to Java. Passes a set of command line
parameters to YarnJobSubmissionClient on
+ /// the Java side. The command line parameters should be strictly ordered.
+ /// Note that the EnableRestart parameter will only be true if the user
ever binds a DriverRestartedHandler.
+ /// </summary>
internal class ClrClient2JavaClientCuratedParameters
{
public int TcpPortRangeStart { get; private set; }
public int TcpPortRangeCount { get; private set; }
public int TcpPortRangeTryCount { get; private set; }
public int TcpPortRangeSeed { get; private set; }
+ public int MaxApplicationSubmissions { get; private set; }
+ public bool EnableRestart { get; private set; }
[Inject]
@@ -34,12 +45,38 @@ namespace Org.Apache.REEF.Client.Common
[Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart,
[Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount,
[Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
- [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed)
+ [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
+
[Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))]
int maxApplicationSubmissions,
+
[Parameter(typeof(DriverBridgeConfigurationOptions.DriverRestartedHandler))]
IObserver<IDriverRestarted> restartHandler)
+ : this(tcpPortRangeStart, tcpPortRangeCount, tcpPortRangeTryCount,
tcpPortRangeSeed, maxApplicationSubmissions, true)
+ {
+ }
+
+ [Inject]
+ private ClrClient2JavaClientCuratedParameters(
+ [Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart,
+ [Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount,
+ [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
+ [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
+
[Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))]
int maxApplicationSubmissions)
+ : this(tcpPortRangeStart, tcpPortRangeCount, tcpPortRangeTryCount,
tcpPortRangeSeed, maxApplicationSubmissions, false)
+ {
+ }
+
+ private ClrClient2JavaClientCuratedParameters(
+ int tcpPortRangeStart,
+ int tcpPortRangeCount,
+ int tcpPortRangeTryCount,
+ int tcpPortRangeSeed,
+ int maxApplicationSubmissions,
+ bool enableRestart)
{
TcpPortRangeStart = tcpPortRangeStart;
TcpPortRangeCount = tcpPortRangeCount;
TcpPortRangeTryCount = tcpPortRangeTryCount;
TcpPortRangeSeed = tcpPortRangeSeed;
+ MaxApplicationSubmissions = maxApplicationSubmissions;
+ EnableRestart = enableRestart;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
index b6fcc0c..b415ccb 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
@@ -67,7 +67,9 @@ namespace Org.Apache.REEF.Client.YARN
jobSubmission.DriverMemory.ToString(),
javaParams.TcpPortRangeStart.ToString(),
javaParams.TcpPortRangeCount.ToString(),
- javaParams.TcpPortRangeTryCount.ToString()
+ javaParams.TcpPortRangeTryCount.ToString(),
+ javaParams.MaxApplicationSubmissions.ToString(),
+ javaParams.EnableRestart.ToString()
);
Logger.Log(Level.Info, "Submitted the Driver for execution.");
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
index f62f69b..9935a89 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -138,6 +138,11 @@ namespace Org.Apache.REEF.Driver.Bridge
{
}
+ [NamedParameter("The number of times an application should be
submitted in case of failure.", "MaxApplicationSubmissions", "1")]
+ public class MaxApplicationSubmissions : Name<int>
+ {
+ }
+
[NamedParameter("Command Line Arguments supplied by client",
"CommandLineArguments", null)]
public class ArgumentSets : Name<ISet<string>>
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
index c72a9da..70c4ce1 100644
--- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
@@ -158,6 +158,12 @@ namespace Org.Apache.REEF.Driver
new OptionalParameter<TraceListener>();
/// <summary>
+ /// The number of times the application should be submitted in case of
failures
+ /// </summary>
+ public static readonly OptionalParameter<int>
MaxApplicationSubmissions =
+ new OptionalParameter<int>();
+
+ /// <summary>
/// The implemenation for (attempting to) re-establish connection to
driver
/// </summary>
public static readonly OptionalImpl<IDriverConnection>
OnDriverReconnect = new OptionalImpl<IDriverConnection>();
@@ -203,6 +209,8 @@ namespace Org.Apache.REEF.Driver
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
OnDriverRestartTaskRunning)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
CustomTraceLevel)
+
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.MaxApplicationSubmissions>.Class,
+ MaxApplicationSubmissions)
.Build()
// TODO: Move this up
.Set(OnDriverStarted,
GenericType<ClassHierarchyGeneratingDriverStartObserver>.Class);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index e707549..de8638b 100644
---
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.client.DriverRestartConfiguration;
+import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
+import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
import org.apache.reef.io.TcpPortConfigurationProvider;
import org.apache.reef.javabridge.generic.JobDriver;
import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
@@ -35,6 +37,9 @@ import
org.apache.reef.runtime.yarn.client.uploader.JobUploader;
import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.JARFileMaker;
@@ -61,21 +66,31 @@ public final class YarnJobSubmissionClient {
private final REEFFileNames fileNames;
private final YarnConfiguration yarnConfiguration;
private final ClasspathProvider classpath;
+ private final int maxApplicationSubmissions;
+ private final boolean enableRestart;
@Inject
YarnJobSubmissionClient(final JobUploader uploader,
final YarnConfiguration yarnConfiguration,
final ConfigurationSerializer
configurationSerializer,
final REEFFileNames fileNames,
- final ClasspathProvider classpath) {
+ final ClasspathProvider classpath,
+ @Parameter(MaxApplicationSubmissions.class)
+ final int maxApplicationSubmissions,
+ @Parameter(EnableRestart.class)
+ final boolean enableRestart) {
this.uploader = uploader;
this.configurationSerializer = configurationSerializer;
this.fileNames = fileNames;
this.yarnConfiguration = yarnConfiguration;
this.classpath = classpath;
+ this.maxApplicationSubmissions = maxApplicationSubmissions;
+ this.enableRestart = enableRestart;
}
- private void addYarnDriverConfiguration(final File driverFolder, final
String jobId, final String jobSubmissionFolder)
+ private Configuration addYarnDriverConfiguration(final File driverFolder,
+ final String jobId,
+ final String
jobSubmissionFolder)
throws IOException {
final File driverConfigurationFile = new File(driverFolder,
this.fileNames.getDriverConfigurationPath());
final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
@@ -85,28 +100,32 @@ public final class YarnJobSubmissionClient {
.set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
.build();
- final Configuration yarnDriverRestartConfiguration =
- YarnDriverRestartConfiguration.CONF
- .build();
-
- final Configuration driverRestartConfiguration =
- DriverRestartConfiguration.CONF
- .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED,
JobDriver.RestartHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
- JobDriver.DriverRestartActiveContextHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
- JobDriver.DriverRestartRunningTaskHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
- JobDriver.DriverRestartCompletedHandler.class)
- .build();
-
- final Configuration driverConfiguration = Configurations.merge(
+ Configuration driverConfiguration = Configurations.merge(
Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
- yarnDriverConfiguration,
- yarnDriverRestartConfiguration,
- driverRestartConfiguration);
+ yarnDriverConfiguration);
+
+ if (this.enableRestart) {
+ final Configuration yarnDriverRestartConfiguration =
+ YarnDriverRestartConfiguration.CONF
+ .build();
+
+ final Configuration driverRestartConfiguration =
+ DriverRestartConfiguration.CONF
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED,
JobDriver.RestartHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+ JobDriver.DriverRestartActiveContextHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+ JobDriver.DriverRestartRunningTaskHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
+ JobDriver.DriverRestartCompletedHandler.class)
+ .build();
+
+ driverConfiguration = Configurations.merge(
+ driverConfiguration, yarnDriverRestartConfiguration,
driverRestartConfiguration);
+ }
this.configurationSerializer.toFile(driverConfiguration,
driverConfigurationFile);
+ return driverConfiguration;
}
/**
@@ -154,7 +173,8 @@ public final class YarnJobSubmissionClient {
//
------------------------------------------------------------------------
// Prepare the JAR
final JobFolder jobFolderOnDFS =
this.uploader.createJobFolder(submissionHelper.getApplicationId());
- this.addYarnDriverConfiguration(driverFolder, jobId,
jobFolderOnDFS.getPath().toString());
+ final Configuration jobSubmissionConfiguration =
+ this.addYarnDriverConfiguration(driverFolder, jobId,
jobFolderOnDFS.getPath().toString());
final File jarFile = makeJar(driverFolder);
LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
@@ -165,22 +185,31 @@ public final class YarnJobSubmissionClient {
final LocalResource jarFileOnDFS =
jobFolderOnDFS.uploadAsLocalResource(jarFile);
LOG.info("Uploaded job submission JAR");
+ final Injector jobParamsInjector =
Tang.Factory.getTang().newInjector(jobSubmissionConfiguration);
//
------------------------------------------------------------------------
// Submit
- submissionHelper
- .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
- .setApplicationName(jobId)
- .setDriverMemory(driverMemory)
- .setPriority(priority)
- .setQueue(queue)
- .submit(ClientRemoteIdentifier.NONE);
+ try {
+ submissionHelper
+ .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
+ .setApplicationName(jobId)
+ .setDriverMemory(driverMemory)
+ .setPriority(priority)
+ .setQueue(queue)
+ .setMaxApplicationAttempts(this.maxApplicationSubmissions)
+
.setPreserveEvaluators(jobParamsInjector.getNamedInstance(ResourceManagerPreserveEvaluators.class))
+ .submit();
+ } catch (InjectionException ie) {
+ throw new RuntimeException("Unable to submit job due to " + ie);
+ }
}
}
private static Configuration getRuntimeConfiguration(final int tcpBeginPort,
final int tcpRangeCount,
- final int tcpTryCount) {
+ final int tcpTryCount,
+ final boolean
enableRestart,
+ final int
maxApplicationSubmissions) {
final Configuration yarnClientConfig = YarnClientConfiguration.CONF
.build();
@@ -191,9 +220,23 @@ public final class YarnJobSubmissionClient {
.bindNamedParameter(TcpPortRangeTryCount.class,
Integer.toString(tcpTryCount))
.build();
- return Configurations.merge(yarnClientConfig, providerConfig);
+ final Configuration yarnJobSubmissionClientParamsConfig =
Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(EnableRestart.class,
Boolean.toString(enableRestart))
+ .bindNamedParameter(MaxApplicationSubmissions.class,
Integer.toString(maxApplicationSubmissions))
+ .build();
+
+ return Configurations.merge(yarnClientConfig, providerConfig,
yarnJobSubmissionClientParamsConfig);
}
+ /**
+ * Takes 5 parameters from the C# side:
+ * [0]: String. Driver folder.
+ * [1]: String. Driver identifier.
+ * [2]: int. Driver memory.
+ * [3~5]: int. TCP configurations.
+ * [6]: int. Max application submissions.
+ * [7]: boolean. Enable restart.
+ */
public static void main(final String[] args) throws InjectionException,
IOException, YarnException {
final File driverFolder = new File(args[0]);
final String jobId = args[1];
@@ -201,11 +244,15 @@ public final class YarnJobSubmissionClient {
final int tcpBeginPort = Integer.valueOf(args[3]);
final int tcpRangeCount = Integer.valueOf(args[4]);
final int tcpTryCount = Integer.valueOf(args[5]);
+ final int maxApplicationSubmissions = Integer.valueOf(args[6]);
+ final boolean enableRestart = Boolean.valueOf(args[7]);
+
// Static for now
final int priority = 1;
final String queue = "default";
- final Configuration yarnConfiguration =
getRuntimeConfiguration(tcpBeginPort, tcpRangeCount, tcpTryCount);
+ final Configuration yarnConfiguration = getRuntimeConfiguration(
+ tcpBeginPort, tcpRangeCount, tcpTryCount, enableRestart,
maxApplicationSubmissions);
final YarnJobSubmissionClient client = Tang.Factory.getTang()
.newInjector(yarnConfiguration)
.getInstance(YarnJobSubmissionClient.class);
@@ -213,3 +260,12 @@ public final class YarnJobSubmissionClient {
client.launch(driverFolder, jobId, priority, queue, driverMemory);
}
}
+
+/**
+ * Whether the resource manager should enable restart. Only used by C# job
submission.
+ */
+@NamedParameter(doc = "Whether the job driver should enable restart",
default_value = "false")
+final class EnableRestart implements Name<Boolean> {
+ private EnableRestart() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
index a1ae9c4..49e55b9 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
@@ -177,17 +177,23 @@ public final class DriverConfiguration extends
ConfigurationModuleBuilder {
public static final OptionalImpl<EventHandler<ContextMessage>>
ON_CONTEXT_MESSAGE = new OptionalImpl<>();
/**
- * "Number of threads allocated per evaluator to dispatch events from this
Evaluator.
+ * Number of threads allocated per evaluator to dispatch events from this
Evaluator.
*/
public static final OptionalParameter<Integer> EVALUATOR_DISPATCHER_THREADS
= new OptionalParameter<>();
/**
+ * The number of submissions that the resource manager will attempt to
submit the application. Defaults to 1.
+ */
+ public static final OptionalParameter<Integer> MAX_APPLICATION_SUBMISSIONS =
new OptionalParameter<>();
+
+ /**
* ConfigurationModule to fill out to get a legal Driver Configuration.
*/
public static final ConfigurationModule CONF = new
DriverConfiguration().merge(DriverRuntimeConfiguration.CONF)
.bindNamedParameter(DriverIdentifier.class, DRIVER_IDENTIFIER)
.bindNamedParameter(DriverMemory.class, DRIVER_MEMORY)
.bindNamedParameter(DriverJobSubmissionDirectory.class,
DRIVER_JOB_SUBMISSION_DIRECTORY)
+ .bindNamedParameter(MaxApplicationSubmissions.class,
MAX_APPLICATION_SUBMISSIONS)
.bindSetEntry(JobGlobalFiles.class, GLOBAL_FILES)
.bindSetEntry(JobGlobalLibraries.class, GLOBAL_LIBRARIES)
.bindSetEntry(DriverLocalFiles.class, LOCAL_FILES)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/MaxApplicationSubmissions.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/MaxApplicationSubmissions.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/MaxApplicationSubmissions.java
new file mode 100644
index 0000000..38fa033
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/MaxApplicationSubmissions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether the resource manager should preserve evaluators on job driver
failure.
+ */
+@NamedParameter(doc = "The number of times the resource manager should attempt
to submit the application.",
+ default_value = "1")
+public final class MaxApplicationSubmissions implements Name<Integer> {
+ private MaxApplicationSubmissions() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ResourceManagerPreserveEvaluators.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ResourceManagerPreserveEvaluators.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ResourceManagerPreserveEvaluators.java
new file mode 100644
index 0000000..40ada88
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ResourceManagerPreserveEvaluators.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether the resource manager should preserve evaluators on job driver
failure.
+ */
+@NamedParameter(doc = "Whether the resource manager should preserve
evaluators" +
+ " on job driver failure.", default_value = "false")
+public final class ResourceManagerPreserveEvaluators implements Name<Boolean> {
+ private ResourceManagerPreserveEvaluators() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
index 0b6e354..a974ed7 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
@@ -77,13 +77,17 @@ final class JobSubmissionHelper {
throws InjectionException, IOException {
final Injector injector =
Tang.Factory.getTang().newInjector(driverConfiguration);
+ final boolean preserveEvaluators =
injector.getNamedInstance(ResourceManagerPreserveEvaluators.class);
+ final int maxAppSubmissions =
injector.getNamedInstance(MaxApplicationSubmissions.class);
+
final JobSubmissionEventImpl.Builder jbuilder =
JobSubmissionEventImpl.newBuilder()
.setIdentifier(returnOrGenerateDriverId(injector.getNamedInstance(DriverIdentifier.class)))
.setDriverMemory(injector.getNamedInstance(DriverMemory.class))
.setUserName(System.getProperty("user.name"))
+ .setPreserveEvaluators(preserveEvaluators)
+ .setMaxApplicationSubmissions(maxAppSubmissions)
.setConfiguration(driverConfiguration);
-
for (final String globalFileName :
injector.getNamedInstance(JobGlobalFiles.class)) {
LOG.log(Level.FINEST, "Adding global file: {0}", globalFileName);
jbuilder.addGlobalFile(getFileResourceProto(globalFileName,
FileType.PLAIN));
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
index 7bf3123..094ad2c 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
@@ -75,6 +75,17 @@ public interface JobSubmissionEvent {
Optional<Integer> getPriority();
/**
+ * @return True if evaluators are to be preserved across driver failures.
+ */
+ Optional<Boolean> getPreserveEvaluators();
+
+ /**
+ * Returns the number of time that the driver should be started by the
resource manager
+ * if it fails unexpectedly.
+ */
+ Optional<Integer> getMaxApplicationSubmissions();
+
+ /**
* @return Queue to submit the Job to
* @deprecated in 0.12. Use
org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead.
*/
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
index 3a6e9dd..4ac0a36 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
@@ -40,6 +40,8 @@ public final class JobSubmissionEventImpl implements
JobSubmissionEvent {
private final Optional<Integer> driverMemory;
private final Optional<Integer> priority;
private final Optional<String> queue;
+ private final Optional<Boolean> preserveEvaluators;
+ private final Optional<Integer> maxApplicationSubmissions;
private JobSubmissionEventImpl(final Builder builder) {
this.identifier = BuilderUtils.notNull(builder.identifier);
@@ -50,7 +52,9 @@ public final class JobSubmissionEventImpl implements
JobSubmissionEvent {
this.localFileSet = BuilderUtils.notNull(builder.localFileSet);
this.driverMemory = Optional.ofNullable(builder.driverMemory);
this.priority = Optional.ofNullable(builder.priority);
+ this.preserveEvaluators = Optional.ofNullable(builder.preserveEvaluators);
this.queue = Optional.ofNullable(builder.queue);
+ this.maxApplicationSubmissions =
Optional.ofNullable(builder.maxApplicationSubmissions);
}
@Override
@@ -94,6 +98,16 @@ public final class JobSubmissionEventImpl implements
JobSubmissionEvent {
}
@Override
+ public Optional<Boolean> getPreserveEvaluators() {
+ return preserveEvaluators;
+ }
+
+ @Override
+ public Optional<Integer> getMaxApplicationSubmissions() {
+ return maxApplicationSubmissions;
+ }
+
+ @Override
public Optional<String> getQueue() {
return queue;
}
@@ -115,6 +129,8 @@ public final class JobSubmissionEventImpl implements
JobSubmissionEvent {
private Integer driverMemory;
private Integer priority;
private String queue;
+ private Boolean preserveEvaluators;
+ private Integer maxApplicationSubmissions;
/**
* @see JobSubmissionEvent#getIdentifier()
@@ -183,6 +199,22 @@ public final class JobSubmissionEventImpl implements
JobSubmissionEvent {
}
/**
+ * @see JobSubmissionEvent#getPreserveEvaluators()
+ */
+ public Builder setPreserveEvaluators(final Boolean preserveEvaluators) {
+ this.preserveEvaluators = preserveEvaluators;
+ return this;
+ }
+
+ /**
+ * @see JobSubmissionEvent#getMaxApplicationSubmissions()
+ */
+ public Builder setMaxApplicationSubmissions(final Integer
maxApplicationSubmissions) {
+ this.maxApplicationSubmissions = maxApplicationSubmissions;
+ return this;
+ }
+
+ /**
* @see JobSubmissionEvent#getQueue()
* @deprecated in 0.12. Use
org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead.
*/
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index 695ac8a..cbac9ea 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.common.driver;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
import org.apache.reef.driver.parameters.ServiceEvaluatorAllocatedHandlers;
import org.apache.reef.driver.parameters.ServiceEvaluatorCompletedHandlers;
import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
@@ -38,6 +39,10 @@ public final class DriverRuntimeRestartConfiguration extends
ConfigurationModule
}
public static final ConfigurationModule CONF = new
DriverRuntimeRestartConfiguration()
+
+ // Automatically sets preserve evaluators to true.
+ .bindNamedParameter(ResourceManagerPreserveEvaluators.class,
Boolean.toString(true))
+
.bindImplementation(DriverRestartManager.class,
DriverRestartManagerImpl.class)
.bindSetEntry(ServiceEvaluatorAllocatedHandlers.class,
EvaluatorPreservingEvaluatorAllocatedHandler.class)
.bindSetEntry(ServiceEvaluatorFailedHandlers.class,
EvaluatorPreservingEvaluatorFailedHandler.class)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index eda421f..ba35f41 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -109,7 +109,9 @@ final class YarnJobSubmissionHandler implements
JobSubmissionHandler {
.setDriverMemory(jobSubmissionEvent.getDriverMemory().get())
.setPriority(getPriority(jobSubmissionEvent))
.setQueue(getQueue(jobSubmissionEvent))
- .submit(jobSubmissionEvent.getRemoteId());
+ .setPreserveEvaluators(getPreserveEvaluators(jobSubmissionEvent))
+
.setMaxApplicationAttempts(getMaxApplicationSubmissions(jobSubmissionEvent))
+ .submit();
LOG.log(Level.FINEST, "Submitted job with ID [{0}]",
jobSubmissionEvent.getIdentifier());
} catch (final YarnException | IOException e) {
@@ -145,6 +147,20 @@ final class YarnJobSubmissionHandler implements
JobSubmissionHandler {
}
/**
+ * Extract the information on whether or not the job should preserve
evaluators across job driver restarts.
+ */
+ private Boolean getPreserveEvaluators(final JobSubmissionEvent
jobSubmissionEvent) {
+ return jobSubmissionEvent.getPreserveEvaluators().orElse(false);
+ }
+
+ /**
+ * Extract the number of maximum application attempts on the job.
+ */
+ private Integer getMaxApplicationSubmissions(final JobSubmissionEvent
jobSubmissionEvent) {
+ return jobSubmissionEvent.getMaxApplicationSubmissions().orElse(1);
+ }
+
+ /**
* Extracts the queue name from the driverConfiguration or return default if
none is set.
*
* @param driverConfiguration
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index b5e932c..ca6a04f 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -53,7 +53,8 @@ public final class YarnSubmissionHelper implements Closeable{
private final Map<String, LocalResource> resources = new HashMap<>();
private final REEFFileNames fileNames;
private final ClasspathProvider classpath;
-
+ private boolean preserveEvaluators;
+ private int maxAppSubmissions;
public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
@@ -72,6 +73,8 @@ public final class YarnSubmissionHelper implements Closeable{
this.applicationResponse =
yarnClientApplication.getNewApplicationResponse();
this.applicationSubmissionContext =
yarnClientApplication.getApplicationSubmissionContext();
this.applicationId = applicationSubmissionContext.getApplicationId();
+ this.maxAppSubmissions = 1;
+ this.preserveEvaluators = false;
LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
}
@@ -125,6 +128,47 @@ public final class YarnSubmissionHelper implements
Closeable{
}
/**
+ * Set whether or not the resource manager should preserve evaluators across
driver restarts.
+ * @param preserveEvaluators
+ * @return
+ */
+ public YarnSubmissionHelper setPreserveEvaluators(final boolean
preserveEvaluators) {
+ if (preserveEvaluators) {
+ // when supported, set KeepContainersAcrossApplicationAttempts to be true
+ // so that when driver (AM) crashes, evaluators will still be running
and we can recover later.
+ if
(YarnTypes.isAtOrAfterVersion(YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE))
{
+ LOG.log(
+ Level.FINE,
+ "Hadoop version is {0} or after with
KeepContainersAcrossApplicationAttempts supported," +
+ " will set it to true.",
+ YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE);
+
+
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
+ } else {
+ LOG.log(Level.WARNING,
+ "Hadoop version does not yet support
KeepContainersAcrossApplicationAttempts. Driver restarts " +
+ "will not support recovering evaluators.");
+
+
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false);
+ }
+ } else {
+
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false);
+ }
+
+ return this;
+ }
+
+ /**
+ * Sets the maximum application attempts for the application.
+ * @param maxApplicationAttempts
+ * @return
+ */
+ public YarnSubmissionHelper setMaxApplicationAttempts(final int
maxApplicationAttempts) {
+ applicationSubmissionContext.setMaxAppAttempts(maxApplicationAttempts);
+ return this;
+ }
+
+ /**
* Assign this job submission to a queue.
* @param queueName
* @return
@@ -134,7 +178,7 @@ public final class YarnSubmissionHelper implements
Closeable{
return this;
}
- public void submit(final String clientRemoteIdentifier) throws IOException,
YarnException {
+ public void submit() throws IOException, YarnException {
// SET EXEC COMMAND
final List<String> launchCommand = new JavaLaunchCommandBuilder()
.setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
@@ -144,6 +188,12 @@ public final class YarnSubmissionHelper implements
Closeable{
.setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" +
this.fileNames.getDriverStderrFileName())
.build();
+ if
(this.applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts()
&&
+ this.applicationSubmissionContext.getMaxAppAttempts() == 1) {
+ LOG.log(Level.WARNING, "Application will not be restarted even though
preserve evaluators is set to true" +
+ " since the max application submissions is 1. Proceeding to submit
application...");
+ }
+
this.applicationSubmissionContext.setAMContainerSpec(YarnTypes.getContainerLaunchContext(launchCommand,
this.resources));
@@ -153,20 +203,6 @@ public final class YarnSubmissionHelper implements
Closeable{
LOG.log(Level.FINEST, "REEF app command: {0}",
StringUtils.join(launchCommand, ' '));
}
- // TODO: this is currently being developed on a hacked 2.4.0 bits, should
be 2.4.1
- final String minVersionKeepContainerOptionAvailable = "2.4.0";
-
- // when supported, set KeepContainersAcrossApplicationAttempts to be true
- // so that when driver (AM) crashes, evaluators will still be running and
we can recover later.
- if (YarnTypes.isAtOrAfterVersion(minVersionKeepContainerOptionAvailable)) {
- LOG.log(
- Level.FINE,
- "Hadoop version is {0} or after with
KeepContainersAcrossApplicationAttempts supported, will set it to true.",
- minVersionKeepContainerOptionAvailable);
-
-
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
- }
-
this.yarnClient.submitApplication(applicationSubmissionContext);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
index 72e058e..48347be 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
@@ -33,6 +33,9 @@ import java.util.Map;
@Private
public final class YarnTypes {
+ // TODO[REEF-537]: Remove once the hadoop version is updated.
+ public static final String MIN_VERSION_KEEP_CONTAINERS_AVAILABLE = "2.4.0";
+
private YarnTypes() {
}