Repository: incubator-reef
Updated Branches:
refs/heads/master fa9d49af4 -> 4d5ca8b07
[REEF-560] Add a configurable timeout for driver to recover evaluators on
restart
* Adding a timeout for driver to recover evaluators on both .NET and
Java side.
* Adding DriverRestartManager as an DriverIdlenessSource to prevent
Driver from exiting on restart.
JIRA:
[REEF-560](https://issues.apache.org/jira/browse/REEF-560)
Pull Request:
This closes #431
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4d5ca8b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4d5ca8b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4d5ca8b0
Branch: refs/heads/master
Commit: 4d5ca8b0782d28259d543789363956cc96d74f0d
Parents: fa9d49a
Author: Andrew Chung <[email protected]>
Authored: Thu Aug 27 13:03:05 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 28 14:56:27 2015 -0700
----------------------------------------------------------------------
.../ClrClient2JavaClientCuratedParameters.cs | 7 ++-
.../Org.Apache.REEF.Client/YARN/YARNClient.cs | 2 +-
.../Bridge/DriverBridgeConfiguration.cs | 6 +--
.../Bridge/DriverBridgeConfigurationOptions.cs | 4 +-
.../DriverConfiguration.cs | 7 +--
.../bridge/client/YarnJobSubmissionClient.java | 37 ++++++++------
.../reef/client/DriverRestartConfiguration.java | 7 +++
.../DriverRestartEvaluatorRecoverySeconds.java | 49 +++++++++++++++++++
.../driver/restart/DriverRestartManager.java | 51 +++++++++++++++-----
.../DriverRuntimeRestartConfiguration.java | 6 +--
10 files changed, 134 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
index 897cdb1..9aa025d 100644
---
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
+++
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
@@ -30,7 +30,6 @@ namespace Org.Apache.REEF.Client.Common
/// <summary>
/// Curated parameters for CLR to Java. Passes a set of command line
parameters to YarnJobSubmissionClient on
/// the Java side. The command line parameters should be strictly ordered.
- /// Note that the EnableRestart parameter will only be true if the user
ever binds a DriverRestartedHandler.
/// </summary>
internal class ClrClient2JavaClientCuratedParameters
{
@@ -39,7 +38,7 @@ namespace Org.Apache.REEF.Client.Common
public int TcpPortRangeTryCount { get; private set; }
public int TcpPortRangeSeed { get; private set; }
public int MaxApplicationSubmissions { get; private set; }
- public bool EnableRestart { get; private set; }
+ public int DriverRestartEvaluatorRecoverySeconds { get; private set; }
[Inject]
@@ -49,14 +48,14 @@ namespace Org.Apache.REEF.Client.Common
[Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
[Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
[Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))]
int maxApplicationSubmissions,
-
[Parameter(typeof(DriverBridgeConfigurationOptions.RestartEnabled))] bool
restartEnabled)
+
[Parameter(typeof(DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds))]
int driverRestartEvaluatorRecoverySeconds)
{
TcpPortRangeStart = tcpPortRangeStart;
TcpPortRangeCount = tcpPortRangeCount;
TcpPortRangeTryCount = tcpPortRangeTryCount;
TcpPortRangeSeed = tcpPortRangeSeed;
MaxApplicationSubmissions = maxApplicationSubmissions;
- EnableRestart = restartEnabled;
+ this.DriverRestartEvaluatorRecoverySeconds =
driverRestartEvaluatorRecoverySeconds;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
index b415ccb..f7e27ad 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
@@ -69,7 +69,7 @@ namespace Org.Apache.REEF.Client.YARN
javaParams.TcpPortRangeCount.ToString(),
javaParams.TcpPortRangeTryCount.ToString(),
javaParams.MaxApplicationSubmissions.ToString(),
- javaParams.EnableRestart.ToString()
+ javaParams.DriverRestartEvaluatorRecoverySeconds.ToString()
);
Logger.Log(Level.Info, "Submitted the Driver for execution.");
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
index f4b5b23..8ba2ff0 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
@@ -181,10 +181,10 @@ namespace Org.Apache.REEF.Driver.Bridge
public static readonly
OptionalImpl<IObserver<IDriverRestartCompleted>> OnDriverRestartCompleted = new
OptionalImpl<IObserver<IDriverRestartCompleted>>();
/// <summary>
- /// Whether or not the application has restart enabled. Defaults to
false.
+ /// Evaluator recovery timeout in seconds for driver restart. If value
is greater than 0, restart is enabled. The default value is -1.
/// </summary>
[SuppressMessage("Microsoft.Security", "CA2104:Do not declare read
only mutable reference types", Justification = "not applicable")]
- public static readonly OptionalParameter<bool> RestartEnabled = new
OptionalParameter<bool>();
+ public static readonly OptionalParameter<int>
DriverRestartEvaluatorRecoverySeconds = new OptionalParameter<int>();
// This is currently not needed in Bridge/Driver model
///// <summary>
@@ -239,7 +239,7 @@ namespace Org.Apache.REEF.Driver.Bridge
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
OnDriverRestartTaskRunning)
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartCompletedHandlers>.Class,
OnDriverRestartCompleted)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
CustomTraceLevel)
-
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.RestartEnabled>.Class,
RestartEnabled)
+
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds>.Class,
DriverRestartEvaluatorRecoverySeconds)
.Build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
index ee72d37..013522d 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -137,8 +137,8 @@ namespace Org.Apache.REEF.Driver.Bridge
{
}
- [NamedParameter("Whether restart should be enabled on the
application", "RestartEnabled", "false")]
- public class RestartEnabled : Name<bool>
+ [NamedParameter("Evaluator recovery timeout for driver restart in
seconds. > 0 => restart is enabled.", "DriverRestartEvaluatorRecoverySeconds",
"-1")]
+ public sealed class DriverRestartEvaluatorRecoverySeconds : Name<int>
{
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
index ee2cff3..7e6c50c 100644
--- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
@@ -174,9 +174,9 @@ namespace Org.Apache.REEF.Driver
public static readonly OptionalImpl<IDriverConnection>
OnDriverReconnect = new OptionalImpl<IDriverConnection>();
/// <summary>
- /// Whether or not the application has restart enabled. Defaults to
false.
+ /// Evaluator recovery timeout for driver restart in seconds. If value
is greater than 0, restart is enabled. The default value is -1.
/// </summary>
- public static readonly OptionalParameter<bool> RestartEnabled = new
OptionalParameter<bool>();
+ public static readonly OptionalParameter<int>
DriverRestartEvaluatorRecoverySeconds = new OptionalParameter<int>();
public static ConfigurationModule ConfigurationModule
{
@@ -223,7 +223,8 @@ namespace Org.Apache.REEF.Driver
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
CustomTraceLevel)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.MaxApplicationSubmissions>.Class,
MaxApplicationSubmissions)
-
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.RestartEnabled>.Class,
RestartEnabled)
+
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds>.Class,
+ DriverRestartEvaluatorRecoverySeconds)
.Build()
// TODO: Move this up
.Set(OnDriverStarted,
GenericType<ClassHierarchyGeneratingDriverStartObserver>.Class)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 3a4aa9a..1f6b16d 100644
---
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -68,7 +68,7 @@ public final class YarnJobSubmissionClient {
private final YarnConfiguration yarnConfiguration;
private final ClasspathProvider classpath;
private final int maxApplicationSubmissions;
- private final boolean enableRestart;
+ private final int driverRestartEvaluatorRecoverySeconds;
private final SecurityTokenProvider tokenProvider;
@Inject
@@ -79,8 +79,8 @@ public final class YarnJobSubmissionClient {
final ClasspathProvider classpath,
@Parameter(MaxApplicationSubmissions.class)
final int maxApplicationSubmissions,
- @Parameter(EnableRestart.class)
- final boolean enableRestart,
+
@Parameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class)
+ final int driverRestartEvaluatorRecoverySeconds,
final SecurityTokenProvider tokenProvider) {
this.uploader = uploader;
this.configurationSerializer = configurationSerializer;
@@ -88,7 +88,7 @@ public final class YarnJobSubmissionClient {
this.yarnConfiguration = yarnConfiguration;
this.classpath = classpath;
this.maxApplicationSubmissions = maxApplicationSubmissions;
- this.enableRestart = enableRestart;
+ this.driverRestartEvaluatorRecoverySeconds =
driverRestartEvaluatorRecoverySeconds;
this.tokenProvider = tokenProvider;
}
@@ -108,7 +108,9 @@ public final class YarnJobSubmissionClient {
Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
yarnDriverConfiguration);
- if (this.enableRestart) {
+ if (driverRestartEvaluatorRecoverySeconds > 0) {
+ LOG.log(Level.FINE, "Driver restart is enabled.");
+
final Configuration yarnDriverRestartConfiguration =
YarnDriverRestartConfiguration.CONF
.build();
@@ -120,6 +122,8 @@ public final class YarnJobSubmissionClient {
JobDriver.DriverRestartActiveContextHandler.class)
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
JobDriver.DriverRestartRunningTaskHandler.class)
+
.set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
+ driverRestartEvaluatorRecoverySeconds)
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
JobDriver.DriverRestartCompletedHandler.class)
.build();
@@ -212,7 +216,7 @@ public final class YarnJobSubmissionClient {
private static Configuration getRuntimeConfiguration(final int tcpBeginPort,
final int tcpRangeCount,
final int tcpTryCount,
- final boolean
enableRestart,
+ final int
driverRecoveryTimeout,
final int
maxApplicationSubmissions) {
final Configuration yarnClientConfig = YarnClientConfiguration.CONF
.build();
@@ -225,7 +229,8 @@ public final class YarnJobSubmissionClient {
.build();
final Configuration yarnJobSubmissionClientParamsConfig =
Tang.Factory.getTang().newConfigurationBuilder()
- .bindNamedParameter(EnableRestart.class,
Boolean.toString(enableRestart))
+
.bindNamedParameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class,
+ Integer.toString(driverRecoveryTimeout))
.bindNamedParameter(MaxApplicationSubmissions.class,
Integer.toString(maxApplicationSubmissions))
.build();
@@ -239,7 +244,7 @@ public final class YarnJobSubmissionClient {
* [2]: int. Driver memory.
* [3~5]: int. TCP configurations.
* [6]: int. Max application submissions.
- * [7]: boolean. Enable restart.
+ * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart
is enabled.
*/
public static void main(final String[] args) throws InjectionException,
IOException, YarnException {
final File driverFolder = new File(args[0]);
@@ -249,14 +254,14 @@ public final class YarnJobSubmissionClient {
final int tcpRangeCount = Integer.valueOf(args[4]);
final int tcpTryCount = Integer.valueOf(args[5]);
final int maxApplicationSubmissions = Integer.valueOf(args[6]);
- final boolean enableRestart = Boolean.valueOf(args[7]);
+ final int driverRecoveryTimeout = Integer.valueOf(args[7]);
// Static for now
final int priority = 1;
final String queue = "default";
final Configuration yarnConfiguration = getRuntimeConfiguration(
- tcpBeginPort, tcpRangeCount, tcpTryCount, enableRestart,
maxApplicationSubmissions);
+ tcpBeginPort, tcpRangeCount, tcpTryCount, driverRecoveryTimeout,
maxApplicationSubmissions);
final YarnJobSubmissionClient client = Tang.Factory.getTang()
.newInjector(yarnConfiguration)
.getInstance(YarnJobSubmissionClient.class);
@@ -266,10 +271,14 @@ public final class YarnJobSubmissionClient {
}
/**
- * Whether the resource manager should enable restart. Only used by C# job
submission.
+ * How long the driver should wait before timing out on evaluator
+ * recovery in seconds. Defaults to -1. If value is negative, the restart
functionality will not be
+ * enabled. Only used by .NET job submission.
*/
-@NamedParameter(doc = "Whether the job driver should enable restart",
default_value = "false")
-final class EnableRestart implements Name<Boolean> {
- private EnableRestart() {
+@NamedParameter(doc = "How long the driver should wait before timing out on
evaluator" +
+ " recovery in seconds. Defaults to -1. If value is negative, the restart
functionality will not be" +
+ " enabled. Only used by .NET job submission.", default_value = "-1")
+final class SubmissionDriverRestartEvaluatorRecoverySeconds implements
Name<Integer> {
+ private SubmissionDriverRestartEvaluatorRecoverySeconds() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
index 225bd41..185440e 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -64,6 +64,12 @@ public final class DriverRestartConfiguration extends
ConfigurationModuleBuilder
new OptionalImpl<>();
/**
+ * The amount of time in seconds the driver waits for evaluators to report
back on restart.
+ * Defaults to 3 minutes. If the value is set to Integer.MAX_VALUE, the
driver will wait forever.
+ */
+ public static final OptionalParameter<Integer>
DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS = new OptionalParameter<>();
+
+ /**
* Parameter to determine whether the driver should fail or continue if
there are evaluator
* preservation log failures. Defaults to false.
*/
@@ -72,6 +78,7 @@ public final class DriverRestartConfiguration extends
ConfigurationModuleBuilder
public static final ConfigurationModule CONF = new
DriverRestartConfiguration()
.bindNamedParameter(FailDriverOnEvaluatorLogErrors.class,
FAIL_DRIVER_ON_EVALUATOR_LOG_ERROR)
+ .bindNamedParameter(DriverRestartEvaluatorRecoverySeconds.class,
DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS)
.bindSetEntry(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
.bindSetEntry(DriverRestartTaskRunningHandlers.class,
ON_DRIVER_RESTART_TASK_RUNNING)
.bindSetEntry(DriverRestartContextActiveHandlers.class,
ON_DRIVER_RESTART_CONTEXT_ACTIVE)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartEvaluatorRecoverySeconds.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartEvaluatorRecoverySeconds.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartEvaluatorRecoverySeconds.java
new file mode 100644
index 0000000..14c2390
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartEvaluatorRecoverySeconds.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Represents the amount of time in seconds that the driver restart waits for
evaluators to report back.
+ * Defaults to 3 minutes. If the value is set to Integer.MAX_VALUE, the driver
will wait forever until all
+ * expected evaluators report back or fail.
+ */
+@Unstable
+@NamedParameter(doc = "The amount of time in seconds that the driver restart
waits for" +
+ " evaluators to report back. Defaults to 3 minutes. If the value is set to
Integer.MAX_VALUE, " +
+ "the driver will wait forever until all expected evaluators report back or
fail.",
+ default_value = DriverRestartEvaluatorRecoverySeconds.DEFAULT)
+public final class DriverRestartEvaluatorRecoverySeconds implements
Name<Integer> {
+
+ /**
+ * The driver waits forever until all expected evaluators report back or
fail.
+ */
+ public static final String INFINITE = new Long(Integer.MAX_VALUE).toString();
+
+ /**
+ * Default restart wait for the driver is 3 minutes.
+ */
+ public static final String DEFAULT = "180";
+
+ private DriverRestartEvaluatorRecoverySeconds(){
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 52764e4..0557a2f 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -22,9 +22,12 @@ import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
+import org.apache.reef.driver.parameters.DriverRestartEvaluatorRecoverySeconds;
import org.apache.reef.driver.parameters.ServiceDriverRestartCompletedHandlers;
import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
+import org.apache.reef.runtime.common.driver.idle.IdleMessage;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
@@ -41,17 +44,23 @@ import java.util.logging.Logger;
@DriverSide
@Private
@Unstable
-public final class DriverRestartManager {
- private static final Logger LOG =
Logger.getLogger(DriverRestartManager.class.getName());
+public final class DriverRestartManager implements DriverIdlenessSource {
+ private static final String CLASS_NAME =
DriverRestartManager.class.getName();
+ private static final Logger LOG = Logger.getLogger(CLASS_NAME);
+
private final DriverRuntimeRestartManager driverRuntimeRestartManager;
private final Set<EventHandler<DriverRestartCompleted>>
driverRestartCompletedHandlers;
private final Set<EventHandler<DriverRestartCompleted>>
serviceDriverRestartCompletedHandlers;
+ private final int driverRestartEvaluatorRecoverySeconds;
+ private final Timer restartCompletedTimer = new Timer();
private RestartEvaluators restartEvaluators;
private DriverRestartState state = DriverRestartState.NOT_RESTARTED;
@Inject
private DriverRestartManager(final DriverRuntimeRestartManager
driverRuntimeRestartManager,
+
@Parameter(DriverRestartEvaluatorRecoverySeconds.class)
+ final int driverRestartEvaluatorRecoverySeconds,
@Parameter(DriverRestartCompletedHandlers.class)
final Set<EventHandler<DriverRestartCompleted>>
driverRestartCompletedHandlers,
@Parameter(ServiceDriverRestartCompletedHandlers.class)
@@ -59,6 +68,11 @@ public final class DriverRestartManager {
this.driverRuntimeRestartManager = driverRuntimeRestartManager;
this.driverRestartCompletedHandlers = driverRestartCompletedHandlers;
this.serviceDriverRestartCompletedHandlers =
serviceDriverRestartCompletedHandlers;
+ if (driverRestartEvaluatorRecoverySeconds < 0) {
+ throw new
IllegalArgumentException("driverRestartEvaluatorRecoverySeconds must be greater
than 0.");
+ }
+
+ this.driverRestartEvaluatorRecoverySeconds =
driverRestartEvaluatorRecoverySeconds;
}
/**
@@ -76,14 +90,6 @@ public final class DriverRestartManager {
}
/**
- * @return true if the application is a restart instance.
- * Can be already done with restart or in the process of restart.
- */
- public synchronized boolean hasRestarted() {
- return this.state.hasRestarted();
- }
-
- /**
* @return true if the driver is undergoing the process of restart.
*/
public synchronized boolean isRestarting() {
@@ -107,7 +113,16 @@ public final class DriverRestartManager {
driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators());
- // TODO[REEF-560]: Call onDriverRestartCompleted() on a Timer.
+ if (driverRestartEvaluatorRecoverySeconds != Integer.MAX_VALUE) {
+ // Don't use Clock here because if there is an event scheduled, the
driver will not be idle, even if
+ // driver restart has already completed, and we cannot cancel the event.
+ restartCompletedTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ onDriverRestartCompleted();
+ }
+ }, driverRestartEvaluatorRecoverySeconds * 1000L);
+ }
}
/**
@@ -266,6 +281,8 @@ public final class DriverRestartManager {
LOG.log(Level.FINE, "Restart completed. Evaluators that have not
reported back are: " + outstandingEvaluatorIds);
}
+
+ restartCompletedTimer.cancel();
}
/**
@@ -293,4 +310,16 @@ public final class DriverRestartManager {
return failed;
}
+
+ /**
+ * {@inheritDoc}
+ * @return True if not in process of restart. False otherwise.
+ */
+ @Override
+ public IdleMessage getIdleStatus() {
+ boolean idleState = !this.state.isRestarting();
+ final String idleMessage = idleState ? CLASS_NAME + " currently not in the
process of restart." :
+ CLASS_NAME + " currently in the process of restart.";
+ return new IdleMessage(CLASS_NAME, idleMessage, idleState);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index 0db7a54..2f6768a 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -20,10 +20,7 @@ package org.apache.reef.runtime.common.driver;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
-import org.apache.reef.driver.parameters.ServiceEvaluatorAllocatedHandlers;
-import org.apache.reef.driver.parameters.ServiceEvaluatorCompletedHandlers;
-import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
+import org.apache.reef.driver.parameters.*;
import org.apache.reef.driver.restart.*;
import org.apache.reef.tang.formats.*;
@@ -43,6 +40,7 @@ public final class DriverRuntimeRestartConfiguration extends
ConfigurationModule
// Automatically sets preserve evaluators to true.
.bindNamedParameter(ResourceManagerPreserveEvaluators.class,
Boolean.toString(true))
+ .bindSetEntry(DriverIdleSources.class, DriverRestartManager.class)
.bindSetEntry(ServiceEvaluatorAllocatedHandlers.class,
EvaluatorPreservingEvaluatorAllocatedHandler.class)
.bindSetEntry(ServiceEvaluatorFailedHandlers.class,
EvaluatorPreservingEvaluatorFailedHandler.class)
.bindSetEntry(ServiceEvaluatorCompletedHandlers.class,
EvaluatorPreservingEvaluatorCompletedHandler.class)