Repository: incubator-reef
Updated Branches:
  refs/heads/master 63e0c074c -> d09250e2e


[REEF-631] Implement driver restart completion logic

This addressed the issue by
  * Change restart completion logic to be in
    `DriverRestartManager.onRecoverEvaluator` instead of being in
    `EvaluatorManager`.
  * Implement the actual restart completion logic to call
    `EvaluatorFailedHandler` and to mark Evaluators as expired.

JIRA:
  [REEF-631](https://issues.apache.org/jira/browse/REEF-631)

Pull Request:
  This closes #415


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/d09250e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/d09250e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/d09250e2

Branch: refs/heads/master
Commit: d09250e2e4dcd607b562f8f06cfdb50c35010c05
Parents: 63e0c07
Author: Andrew Chung <[email protected]>
Authored: Tue Aug 25 22:26:19 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 26 14:56:29 2015 -0700

----------------------------------------------------------------------
 .../driver/restart/DriverRestartManager.java    | 75 +++++++++++++++++---
 .../evaluator/EvaluatorHeartbeatHandler.java    | 11 ++-
 .../driver/evaluator/EvaluatorManager.java      | 14 +---
 3 files changed, 76 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d09250e2/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 6e422a1..19f2b64 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -21,7 +21,12 @@ package org.apache.reef.driver.restart;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
+import org.apache.reef.driver.parameters.ServiceDriverRestartCompletedHandlers;
 import org.apache.reef.exception.DriverFatalRuntimeException;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.util.*;
@@ -38,13 +43,21 @@ import java.util.logging.Logger;
 public final class DriverRestartManager {
   private static final Logger LOG = 
Logger.getLogger(DriverRestartManager.class.getName());
   private final DriverRuntimeRestartManager driverRuntimeRestartManager;
+  private final Set<EventHandler<DriverRestartCompleted>> 
driverRestartCompletedHandlers;
+  private final Set<EventHandler<DriverRestartCompleted>> 
serviceDriverRestartCompletedHandlers;
 
   private RestartEvaluators restartEvaluators;
   private DriverRestartState state = DriverRestartState.NOT_RESTARTED;
 
   @Inject
-  private DriverRestartManager(final DriverRuntimeRestartManager 
driverRuntimeRestartManager) {
+  private DriverRestartManager(final DriverRuntimeRestartManager 
driverRuntimeRestartManager,
+                               @Parameter(DriverRestartCompletedHandlers.class)
+                               final Set<EventHandler<DriverRestartCompleted>> 
driverRestartCompletedHandlers,
+                               
@Parameter(ServiceDriverRestartCompletedHandlers.class)
+                               final Set<EventHandler<DriverRestartCompleted>> 
serviceDriverRestartCompletedHandlers) {
     this.driverRuntimeRestartManager = driverRuntimeRestartManager;
+    this.driverRestartCompletedHandlers = driverRestartCompletedHandlers;
+    this.serviceDriverRestartCompletedHandlers = 
serviceDriverRestartCompletedHandlers;
   }
 
   /**
@@ -93,7 +106,7 @@ public final class DriverRestartManager {
 
     
driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators());
 
-    // TODO[REEF-560]: Call onDriverRestartCompleted() (to do in REEF-617) on 
a Timer.
+    // TODO[REEF-560]: Call onDriverRestartCompleted() on a Timer.
   }
 
   /**
@@ -110,11 +123,10 @@ public final class DriverRestartManager {
 
   /**
    * Indicate that this Driver has re-established the connection with one more 
Evaluator of a previous run.
-   * Calls the restart complete action if the latest evaluator is the last 
evaluator to recover.
-   * @return true if the driver restart is completed.
+   * @return true if the evaluator has been newly recovered.
    */
-  public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String 
evaluatorId) {
-    if (getStateOfPreviousEvaluator(evaluatorId) == 
EvaluatorRestartState.NOT_EXPECTED) {
+  public synchronized boolean onRecoverEvaluator(final String evaluatorId) {
+    if (getStateOfPreviousEvaluator(evaluatorId).isFailedOrNotExpected()) {
       final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " 
not expected to be alive.";
       LOG.log(Level.SEVERE, errMsg);
       throw new DriverFatalRuntimeException(errMsg);
@@ -123,11 +135,17 @@ public final class DriverRestartManager {
     if (getStateOfPreviousEvaluator(evaluatorId) != 
EvaluatorRestartState.EXPECTED) {
       LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " 
added to the set" +
           " of recovered evaluators more than once. Ignoring second add...");
-    } else {
-      setEvaluatorReported(evaluatorId);
+      return false;
+    }
+
+    // set the status for this evaluator ID to be reported.
+    setEvaluatorReported(evaluatorId);
+
+    if (haveAllExpectedEvaluatorsReported()) {
+      onDriverRestartCompleted();
     }
 
-    return haveAllExpectedEvaluatorsReported();
+    return true;
   }
 
   /**
@@ -212,6 +230,45 @@ public final class DriverRestartManager {
     return true;
   }
 
+  /**
+   * Sets the driver restart status to be completed if not yet set and 
notifies the restart completed event handlers.
+   */
+  private synchronized void onDriverRestartCompleted() {
+    if (this.state != DriverRestartState.COMPLETED) {
+      final Set<String> outstandingEvaluatorIds = 
getOutstandingEvaluatorsAndMarkExpired();
+      
driverRuntimeRestartManager.informAboutEvaluatorFailures(outstandingEvaluatorIds);
+
+      this.state = DriverRestartState.COMPLETED;
+      final DriverRestartCompleted driverRestartCompleted = new 
DriverRestartCompleted(System.currentTimeMillis());
+
+      for (final EventHandler<DriverRestartCompleted> 
serviceRestartCompletedHandler
+          : this.serviceDriverRestartCompletedHandlers) {
+        serviceRestartCompletedHandler.onNext(driverRestartCompleted);
+      }
+
+      for (final EventHandler<DriverRestartCompleted> restartCompletedHandler 
: this.driverRestartCompletedHandlers) {
+        restartCompletedHandler.onNext(driverRestartCompleted);
+      }
+
+      LOG.log(Level.FINE, "Restart completed. Evaluators that have not 
reported back are: " + outstandingEvaluatorIds);
+    }
+  }
+
+  /**
+   * Gets the outstanding evaluators that have not yet reported back and mark 
them as expired.
+   */
+  private Set<String> getOutstandingEvaluatorsAndMarkExpired() {
+    final Set<String> outstanding = new HashSet<>();
+    for (final String previousEvaluatorId : 
restartEvaluators.getEvaluatorIds()) {
+      if (getStateOfPreviousEvaluator(previousEvaluatorId) == 
EvaluatorRestartState.EXPECTED) {
+        outstanding.add(previousEvaluatorId);
+        setEvaluatorExpired(previousEvaluatorId);
+      }
+    }
+
+    return outstanding;
+  }
+
   private Set<String> getFailedEvaluators() {
     final Set<String> failed = new HashSet<>();
     for (final String previousEvaluatorId : 
this.restartEvaluators.getEvaluatorIds()) {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d09250e2/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
index 3d02ead..ee1af9f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -73,13 +73,20 @@ public final class EvaluatorHeartbeatHandler
 
       if (driverRestartManager.isRestarting() &&
           driverRestartManager.getEvaluatorRestartState(evaluatorId) == 
EvaluatorRestartState.EXPECTED) {
-        // TODO[REEF-617]: Create EvaluatorManager for recovered evaluator and 
call onEvaluatorHeartbeatMessage().
+
+        if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) {
+          LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported 
back to the driver after restart.");
+          // TODO[REEF-617]: Create EvaluatorManager, add to this.evaluators, 
and call onEvaluatorHeartbeatMessage().
+
+        } else {
+          LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already 
been recovered.");
+        }
         return;
       }
 
       if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == 
EvaluatorRestartState.EXPIRED) {
         LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has 
reported back to the driver after restart.");
-        // TODO[REEF-617]: Create EvaluatorManager for expired evaluator and 
close it.
+        // TODO[REEF-617]: Create EvaluatorManager, call 
onEvaluatorHeartbeatMessage, and close it.
         return;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d09250e2/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 116a71d..bc8ebbf 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -34,7 +34,6 @@ import 
org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
 import org.apache.reef.io.naming.Identifiable;
 import org.apache.reef.proto.EvaluatorRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.runtime.common.DriverRestartCompleted;
 import org.apache.reef.driver.evaluator.EvaluatorProcess;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
@@ -355,18 +354,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
         LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
 
         if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
-
-          // TODO[REEF-617]: Move evaluator recovery to 
EvaluatorHeartbeatHandler and reregister evaluator here.
-          final boolean restartCompleted =
-              
this.driverRestartManager.onRecoverEvaluatorIsRestartComplete(this.evaluatorId);
-
-          LOG.log(Level.FINE, "Received recovery heartbeat from evaluator 
{0}.", this.evaluatorId);
-
-          // TODO[REEF-617]: Move restart completion logic to 
DriverRestartManager.
-          if (restartCompleted) {
-            this.messageDispatcher.onDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
-            LOG.log(Level.INFO, "All expected evaluators checked in.");
-          }
+          driverRestartManager.setEvaluatorReregistered(evaluatorId);
         }
       }
 

Reply via email to