Repository: incubator-reef
Updated Branches:
  refs/heads/master fa9d49af4 -> 4d5ca8b07


[REEF-560] Add a configurable timeout for driver to recover evaluators on 
restart

  * Adding a timeout for driver to recover evaluators on both .NET and
    Java side.
  * Adding DriverRestartManager as an DriverIdlenessSource to prevent
    Driver from exiting on restart.

JIRA:
  [REEF-560](https://issues.apache.org/jira/browse/REEF-560)

Pull Request:
  This closes #431


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4d5ca8b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4d5ca8b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4d5ca8b0

Branch: refs/heads/master
Commit: 4d5ca8b0782d28259d543789363956cc96d74f0d
Parents: fa9d49a
Author: Andrew Chung <[email protected]>
Authored: Thu Aug 27 13:03:05 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 28 14:56:27 2015 -0700

----------------------------------------------------------------------
 .../ClrClient2JavaClientCuratedParameters.cs    |  7 ++-
 .../Org.Apache.REEF.Client/YARN/YARNClient.cs   |  2 +-
 .../Bridge/DriverBridgeConfiguration.cs         |  6 +--
 .../Bridge/DriverBridgeConfigurationOptions.cs  |  4 +-
 .../DriverConfiguration.cs                      |  7 +--
 .../bridge/client/YarnJobSubmissionClient.java  | 37 ++++++++------
 .../reef/client/DriverRestartConfiguration.java |  7 +++
 .../DriverRestartEvaluatorRecoverySeconds.java  | 49 +++++++++++++++++++
 .../driver/restart/DriverRestartManager.java    | 51 +++++++++++++++-----
 .../DriverRuntimeRestartConfiguration.java      |  6 +--
 10 files changed, 134 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
 
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
index 897cdb1..9aa025d 100644
--- 
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
+++ 
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
@@ -30,7 +30,6 @@ namespace Org.Apache.REEF.Client.Common
     /// <summary>
     /// Curated parameters for CLR to Java. Passes a set of command line 
parameters to YarnJobSubmissionClient on
     /// the Java side. The command line parameters should be strictly ordered.
-    /// Note that the EnableRestart parameter will only be true if the user 
ever binds a DriverRestartedHandler.
     /// </summary>
     internal class ClrClient2JavaClientCuratedParameters
     {
@@ -39,7 +38,7 @@ namespace Org.Apache.REEF.Client.Common
         public int TcpPortRangeTryCount { get; private set; }
         public int TcpPortRangeSeed { get; private set; }
         public int MaxApplicationSubmissions { get; private set; }
-        public bool EnableRestart { get; private set; }
+        public int DriverRestartEvaluatorRecoverySeconds { get; private set; }
 
 
         [Inject]
@@ -49,14 +48,14 @@ namespace Org.Apache.REEF.Client.Common
             [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
             [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
             
[Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))] 
int maxApplicationSubmissions,
-            
[Parameter(typeof(DriverBridgeConfigurationOptions.RestartEnabled))] bool 
restartEnabled)
+            
[Parameter(typeof(DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds))]
 int driverRestartEvaluatorRecoverySeconds)
         {
             TcpPortRangeStart = tcpPortRangeStart;
             TcpPortRangeCount = tcpPortRangeCount;
             TcpPortRangeTryCount = tcpPortRangeTryCount;
             TcpPortRangeSeed = tcpPortRangeSeed;
             MaxApplicationSubmissions = maxApplicationSubmissions;
-            EnableRestart = restartEnabled;
+            this.DriverRestartEvaluatorRecoverySeconds = 
driverRestartEvaluatorRecoverySeconds;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
index b415ccb..f7e27ad 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
@@ -69,7 +69,7 @@ namespace Org.Apache.REEF.Client.YARN
                 javaParams.TcpPortRangeCount.ToString(),
                 javaParams.TcpPortRangeTryCount.ToString(),
                 javaParams.MaxApplicationSubmissions.ToString(),
-                javaParams.EnableRestart.ToString()
+                javaParams.DriverRestartEvaluatorRecoverySeconds.ToString()
                 );
             Logger.Log(Level.Info, "Submitted the Driver for execution.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
index f4b5b23..8ba2ff0 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
@@ -181,10 +181,10 @@ namespace Org.Apache.REEF.Driver.Bridge
         public static readonly 
OptionalImpl<IObserver<IDriverRestartCompleted>> OnDriverRestartCompleted = new 
OptionalImpl<IObserver<IDriverRestartCompleted>>();
 
         /// <summary>
-        /// Whether or not the application has restart enabled. Defaults to 
false.
+        /// Evaluator recovery timeout in seconds for driver restart. If value 
is greater than 0, restart is enabled. The default value is -1.
         /// </summary>
         [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read 
only mutable reference types", Justification = "not applicable")]
-        public static readonly OptionalParameter<bool> RestartEnabled = new 
OptionalParameter<bool>();
+        public static readonly OptionalParameter<int> 
DriverRestartEvaluatorRecoverySeconds = new OptionalParameter<int>();
 
         // This is currently not needed in Bridge/Driver model
         ///// <summary>
@@ -239,7 +239,7 @@ namespace Org.Apache.REEF.Driver.Bridge
                 
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
 OnDriverRestartTaskRunning)
                 
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartCompletedHandlers>.Class,
 OnDriverRestartCompleted)
                 
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
 CustomTraceLevel)
-                
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.RestartEnabled>.Class,
 RestartEnabled)
+                
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds>.Class,
 DriverRestartEvaluatorRecoverySeconds)
                 .Build();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
index ee72d37..013522d 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -137,8 +137,8 @@ namespace Org.Apache.REEF.Driver.Bridge
         {
         }
 
-        [NamedParameter("Whether restart should be enabled on the 
application", "RestartEnabled", "false")]
-        public class RestartEnabled : Name<bool>
+        [NamedParameter("Evaluator recovery timeout for driver restart in 
seconds. > 0 => restart is enabled.", "DriverRestartEvaluatorRecoverySeconds", 
"-1")]
+        public sealed class DriverRestartEvaluatorRecoverySeconds : Name<int>
         {
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
index ee2cff3..7e6c50c 100644
--- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
@@ -174,9 +174,9 @@ namespace Org.Apache.REEF.Driver
         public static readonly OptionalImpl<IDriverConnection> 
OnDriverReconnect = new OptionalImpl<IDriverConnection>();
 
         /// <summary>
-        /// Whether or not the application has restart enabled. Defaults to 
false.
+        /// Evaluator recovery timeout for driver restart in seconds. If value 
is greater than 0, restart is enabled. The default value is -1.
         /// </summary>
-        public static readonly OptionalParameter<bool> RestartEnabled = new 
OptionalParameter<bool>();
+        public static readonly OptionalParameter<int> 
DriverRestartEvaluatorRecoverySeconds = new OptionalParameter<int>();
 
         public static ConfigurationModule ConfigurationModule
         {
@@ -223,7 +223,8 @@ namespace Org.Apache.REEF.Driver
                     
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
 CustomTraceLevel)
                     
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.MaxApplicationSubmissions>.Class,
                         MaxApplicationSubmissions)
-                    
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.RestartEnabled>.Class,
 RestartEnabled)
+                    
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds>.Class,
+                        DriverRestartEvaluatorRecoverySeconds)
                     .Build()
                     // TODO: Move this up
                     .Set(OnDriverStarted, 
GenericType<ClassHierarchyGeneratingDriverStartObserver>.Class)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 3a4aa9a..1f6b16d 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -68,7 +68,7 @@ public final class YarnJobSubmissionClient {
   private final YarnConfiguration yarnConfiguration;
   private final ClasspathProvider classpath;
   private final int maxApplicationSubmissions;
-  private final boolean enableRestart;
+  private final int driverRestartEvaluatorRecoverySeconds;
   private final SecurityTokenProvider tokenProvider;
 
   @Inject
@@ -79,8 +79,8 @@ public final class YarnJobSubmissionClient {
                           final ClasspathProvider classpath,
                           @Parameter(MaxApplicationSubmissions.class)
                           final int maxApplicationSubmissions,
-                          @Parameter(EnableRestart.class)
-                          final boolean enableRestart,
+                          
@Parameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class)
+                          final int driverRestartEvaluatorRecoverySeconds,
                           final SecurityTokenProvider tokenProvider) {
     this.uploader = uploader;
     this.configurationSerializer = configurationSerializer;
@@ -88,7 +88,7 @@ public final class YarnJobSubmissionClient {
     this.yarnConfiguration = yarnConfiguration;
     this.classpath = classpath;
     this.maxApplicationSubmissions = maxApplicationSubmissions;
-    this.enableRestart = enableRestart;
+    this.driverRestartEvaluatorRecoverySeconds = 
driverRestartEvaluatorRecoverySeconds;
     this.tokenProvider = tokenProvider;
   }
 
@@ -108,7 +108,9 @@ public final class YarnJobSubmissionClient {
         Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
         yarnDriverConfiguration);
 
-    if (this.enableRestart) {
+    if (driverRestartEvaluatorRecoverySeconds > 0) {
+      LOG.log(Level.FINE, "Driver restart is enabled.");
+
       final Configuration yarnDriverRestartConfiguration =
           YarnDriverRestartConfiguration.CONF
               .build();
@@ -120,6 +122,8 @@ public final class YarnJobSubmissionClient {
                   JobDriver.DriverRestartActiveContextHandler.class)
               .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
                   JobDriver.DriverRestartRunningTaskHandler.class)
+              
.set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
+                  driverRestartEvaluatorRecoverySeconds)
               .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
                   JobDriver.DriverRestartCompletedHandler.class)
               .build();
@@ -212,7 +216,7 @@ public final class YarnJobSubmissionClient {
   private static Configuration getRuntimeConfiguration(final int tcpBeginPort,
                                                        final int tcpRangeCount,
                                                        final int tcpTryCount,
-                                                       final boolean 
enableRestart,
+                                                       final int 
driverRecoveryTimeout,
                                                        final int 
maxApplicationSubmissions) {
     final Configuration yarnClientConfig = YarnClientConfiguration.CONF
         .build();
@@ -225,7 +229,8 @@ public final class YarnJobSubmissionClient {
         .build();
 
     final Configuration yarnJobSubmissionClientParamsConfig = 
Tang.Factory.getTang().newConfigurationBuilder()
-        .bindNamedParameter(EnableRestart.class, 
Boolean.toString(enableRestart))
+        
.bindNamedParameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class,
+            Integer.toString(driverRecoveryTimeout))
         .bindNamedParameter(MaxApplicationSubmissions.class, 
Integer.toString(maxApplicationSubmissions))
         .build();
 
@@ -239,7 +244,7 @@ public final class YarnJobSubmissionClient {
    * [2]: int. Driver memory.
    * [3~5]: int. TCP configurations.
    * [6]: int. Max application submissions.
-   * [7]: boolean. Enable restart.
+   * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart 
is enabled.
    */
   public static void main(final String[] args) throws InjectionException, 
IOException, YarnException {
     final File driverFolder = new File(args[0]);
@@ -249,14 +254,14 @@ public final class YarnJobSubmissionClient {
     final int tcpRangeCount = Integer.valueOf(args[4]);
     final int tcpTryCount = Integer.valueOf(args[5]);
     final int maxApplicationSubmissions = Integer.valueOf(args[6]);
-    final boolean enableRestart = Boolean.valueOf(args[7]);
+    final int driverRecoveryTimeout = Integer.valueOf(args[7]);
 
     // Static for now
     final int priority = 1;
     final String queue = "default";
 
     final Configuration yarnConfiguration = getRuntimeConfiguration(
-        tcpBeginPort, tcpRangeCount, tcpTryCount, enableRestart, 
maxApplicationSubmissions);
+        tcpBeginPort, tcpRangeCount, tcpTryCount, driverRecoveryTimeout, 
maxApplicationSubmissions);
     final YarnJobSubmissionClient client = Tang.Factory.getTang()
         .newInjector(yarnConfiguration)
         .getInstance(YarnJobSubmissionClient.class);
@@ -266,10 +271,14 @@ public final class YarnJobSubmissionClient {
 }
 
 /**
- * Whether the resource manager should enable restart. Only used by C# job 
submission.
+ * How long the driver should wait before timing out on evaluator
+ * recovery in seconds. Defaults to -1. If value is negative, the restart 
functionality will not be
+ * enabled. Only used by .NET job submission.
  */
-@NamedParameter(doc = "Whether the job driver should enable restart", 
default_value = "false")
-final class EnableRestart implements Name<Boolean> {
-  private EnableRestart() {
+@NamedParameter(doc = "How long the driver should wait before timing out on 
evaluator" +
+    " recovery in seconds. Defaults to -1. If value is negative, the restart 
functionality will not be" +
+    " enabled. Only used by .NET job submission.", default_value = "-1")
+final class SubmissionDriverRestartEvaluatorRecoverySeconds implements 
Name<Integer> {
+  private SubmissionDriverRestartEvaluatorRecoverySeconds() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
index 225bd41..185440e 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -64,6 +64,12 @@ public final class DriverRestartConfiguration extends 
ConfigurationModuleBuilder
       new OptionalImpl<>();
 
   /**
+   * The amount of time in seconds the driver waits for evaluators to report 
back on restart.
+   * Defaults to 3 minutes. If the value is set to Integer.MAX_VALUE, the 
driver will wait forever.
+   */
+  public static final OptionalParameter<Integer> 
DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS = new OptionalParameter<>();
+
+  /**
    * Parameter to determine whether the driver should fail or continue if 
there are evaluator
    * preservation log failures. Defaults to false.
    */
@@ -72,6 +78,7 @@ public final class DriverRestartConfiguration extends 
ConfigurationModuleBuilder
 
   public static final ConfigurationModule CONF = new 
DriverRestartConfiguration()
       .bindNamedParameter(FailDriverOnEvaluatorLogErrors.class, 
FAIL_DRIVER_ON_EVALUATOR_LOG_ERROR)
+      .bindNamedParameter(DriverRestartEvaluatorRecoverySeconds.class, 
DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS)
       .bindSetEntry(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
       .bindSetEntry(DriverRestartTaskRunningHandlers.class, 
ON_DRIVER_RESTART_TASK_RUNNING)
       .bindSetEntry(DriverRestartContextActiveHandlers.class, 
ON_DRIVER_RESTART_CONTEXT_ACTIVE)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartEvaluatorRecoverySeconds.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartEvaluatorRecoverySeconds.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartEvaluatorRecoverySeconds.java
new file mode 100644
index 0000000..14c2390
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartEvaluatorRecoverySeconds.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Represents the amount of time in seconds that the driver restart waits for 
evaluators to report back.
+ * Defaults to 3 minutes. If the value is set to Integer.MAX_VALUE, the driver 
will wait forever until all
+ * expected evaluators report back or fail.
+ */
+@Unstable
+@NamedParameter(doc = "The amount of time in seconds that the driver restart 
waits for" +
+    " evaluators to report back. Defaults to 3 minutes. If the value is set to 
Integer.MAX_VALUE, " +
+    "the driver will wait forever until all expected evaluators report back or 
fail.",
+    default_value = DriverRestartEvaluatorRecoverySeconds.DEFAULT)
+public final class DriverRestartEvaluatorRecoverySeconds implements 
Name<Integer> {
+
+  /**
+   * The driver waits forever until all expected evaluators report back or 
fail.
+   */
+  public static final String INFINITE = new Long(Integer.MAX_VALUE).toString();
+
+  /**
+   * Default restart wait for the driver is 3 minutes.
+   */
+  public static final String DEFAULT = "180";
+
+  private DriverRestartEvaluatorRecoverySeconds(){
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 52764e4..0557a2f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -22,9 +22,12 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
+import org.apache.reef.driver.parameters.DriverRestartEvaluatorRecoverySeconds;
 import org.apache.reef.driver.parameters.ServiceDriverRestartCompletedHandlers;
 import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
+import org.apache.reef.runtime.common.driver.idle.IdleMessage;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
@@ -41,17 +44,23 @@ import java.util.logging.Logger;
 @DriverSide
 @Private
 @Unstable
-public final class DriverRestartManager {
-  private static final Logger LOG = 
Logger.getLogger(DriverRestartManager.class.getName());
+public final class DriverRestartManager implements DriverIdlenessSource {
+  private static final String CLASS_NAME = 
DriverRestartManager.class.getName();
+  private static final Logger LOG = Logger.getLogger(CLASS_NAME);
+
   private final DriverRuntimeRestartManager driverRuntimeRestartManager;
   private final Set<EventHandler<DriverRestartCompleted>> 
driverRestartCompletedHandlers;
   private final Set<EventHandler<DriverRestartCompleted>> 
serviceDriverRestartCompletedHandlers;
+  private final int driverRestartEvaluatorRecoverySeconds;
+  private final Timer restartCompletedTimer = new Timer();
 
   private RestartEvaluators restartEvaluators;
   private DriverRestartState state = DriverRestartState.NOT_RESTARTED;
 
   @Inject
   private DriverRestartManager(final DriverRuntimeRestartManager 
driverRuntimeRestartManager,
+                               
@Parameter(DriverRestartEvaluatorRecoverySeconds.class)
+                               final int driverRestartEvaluatorRecoverySeconds,
                                @Parameter(DriverRestartCompletedHandlers.class)
                                final Set<EventHandler<DriverRestartCompleted>> 
driverRestartCompletedHandlers,
                                
@Parameter(ServiceDriverRestartCompletedHandlers.class)
@@ -59,6 +68,11 @@ public final class DriverRestartManager {
     this.driverRuntimeRestartManager = driverRuntimeRestartManager;
     this.driverRestartCompletedHandlers = driverRestartCompletedHandlers;
     this.serviceDriverRestartCompletedHandlers = 
serviceDriverRestartCompletedHandlers;
+    if (driverRestartEvaluatorRecoverySeconds < 0) {
+      throw new 
IllegalArgumentException("driverRestartEvaluatorRecoverySeconds must be greater 
than 0.");
+    }
+
+    this.driverRestartEvaluatorRecoverySeconds = 
driverRestartEvaluatorRecoverySeconds;
   }
 
   /**
@@ -76,14 +90,6 @@ public final class DriverRestartManager {
   }
 
   /**
-   * @return true if the application is a restart instance.
-   * Can be already done with restart or in the process of restart.
-   */
-  public synchronized boolean hasRestarted() {
-    return this.state.hasRestarted();
-  }
-
-  /**
    * @return true if the driver is undergoing the process of restart.
    */
   public synchronized boolean isRestarting() {
@@ -107,7 +113,16 @@ public final class DriverRestartManager {
 
     
driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators());
 
-    // TODO[REEF-560]: Call onDriverRestartCompleted() on a Timer.
+    if (driverRestartEvaluatorRecoverySeconds != Integer.MAX_VALUE) {
+      // Don't use Clock here because if there is an event scheduled, the 
driver will not be idle, even if
+      // driver restart has already completed, and we cannot cancel the event.
+      restartCompletedTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          onDriverRestartCompleted();
+        }
+      }, driverRestartEvaluatorRecoverySeconds * 1000L);
+    }
   }
 
   /**
@@ -266,6 +281,8 @@ public final class DriverRestartManager {
 
       LOG.log(Level.FINE, "Restart completed. Evaluators that have not 
reported back are: " + outstandingEvaluatorIds);
     }
+
+    restartCompletedTimer.cancel();
   }
 
   /**
@@ -293,4 +310,16 @@ public final class DriverRestartManager {
 
     return failed;
   }
+
+  /**
+   * {@inheritDoc}
+   * @return True if not in process of restart. False otherwise.
+   */
+  @Override
+  public IdleMessage getIdleStatus() {
+    boolean idleState = !this.state.isRestarting();
+    final String idleMessage = idleState ? CLASS_NAME + " currently not in the 
process of restart." :
+        CLASS_NAME + " currently in the process of restart.";
+    return new IdleMessage(CLASS_NAME, idleMessage, idleState);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4d5ca8b0/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index 0db7a54..2f6768a 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -20,10 +20,7 @@ package org.apache.reef.runtime.common.driver;
 
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
-import org.apache.reef.driver.parameters.ServiceEvaluatorAllocatedHandlers;
-import org.apache.reef.driver.parameters.ServiceEvaluatorCompletedHandlers;
-import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
+import org.apache.reef.driver.parameters.*;
 import org.apache.reef.driver.restart.*;
 import org.apache.reef.tang.formats.*;
 
@@ -43,6 +40,7 @@ public final class DriverRuntimeRestartConfiguration extends 
ConfigurationModule
       // Automatically sets preserve evaluators to true.
       .bindNamedParameter(ResourceManagerPreserveEvaluators.class, 
Boolean.toString(true))
 
+      .bindSetEntry(DriverIdleSources.class, DriverRestartManager.class)
       .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, 
EvaluatorPreservingEvaluatorAllocatedHandler.class)
       .bindSetEntry(ServiceEvaluatorFailedHandlers.class, 
EvaluatorPreservingEvaluatorFailedHandler.class)
       .bindSetEntry(ServiceEvaluatorCompletedHandlers.class, 
EvaluatorPreservingEvaluatorCompletedHandler.class)

Reply via email to