Repository: incubator-reef
Updated Branches:
  refs/heads/master 35674a3df -> 1dc1cf3a5


[REEF-559] Tighten previous evaluator ID checks by using entire set of 
evaluator IDs

This addressed the issue by
  * Keeping the set of evaluator IDs that are expected to report back to
    the driver on restart as well as the set of evaluator IDs that have
    reported back.
  * Use the set mentioned to determine whether restart is completed or
    not.

JIRA:
  [REEF-559](https://issues.apache.org/jira/browse/REEF-559)

Pull Request:
  This closes #351


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/1dc1cf3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/1dc1cf3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/1dc1cf3a

Branch: refs/heads/master
Commit: 1dc1cf3a55335a7948b2602ab5eb2b3bce947726
Parents: 35674a3
Author: Andrew Chung <[email protected]>
Authored: Fri Aug 7 11:33:33 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 7 18:02:22 2015 -0700

----------------------------------------------------------------------
 .../driver/restart/DriverRestartManager.java    | 25 ++++----
 .../restart/DriverRestartManagerImpl.java       | 64 ++++++++++++--------
 .../driver/evaluator/EvaluatorManager.java      | 21 +++----
 3 files changed, 59 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1dc1cf3a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 99bd7b0..57c31c8 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -22,6 +22,8 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 
+import java.util.Set;
+
 /**
  * The manager that handles aspects of driver restart such as determining 
whether the driver is in
  * restart mode, what to do on restart, whether restart is completed, and 
others.
@@ -44,35 +46,34 @@ public interface DriverRestartManager {
   void onRestart();
 
   /**
-   * Indicate that the Driver restart is complete. It is meant to be called 
exactly once during a restart and never
-   * during the ininital launch of a Driver.
+   * @return whether restart is completed.
    */
-  void setRestartCompleted();
+  boolean isRestartCompleted();
 
   /**
-   * @return the number of Evaluators expected to check in from a previous run.
+   * @return the Evaluators expected to check in from a previous run.
    */
-  int getNumPreviousContainers();
-
+  Set<String> getPreviousEvaluatorIds();
 
   /**
-   * Set the number of containers to expect still active from a previous 
execution of the Driver in a restart situation.
+   * Set the Evaluators to expect still active from a previous execution of 
the Driver in a restart situation.
    * To be called exactly once during a driver restart.
    *
-   * @param num
+   * @param ids the evaluator IDs of the evaluators that are expected to have 
survived driver restart.
    */
-  void setNumPreviousContainers(final int num);
+  void setPreviousEvaluatorIds(final Set<String> ids);
 
   /**
-   * @return the number of Evaluators from a previous Driver that have checked 
in with the Driver
+   * @return the IDs of the Evaluators from a previous Driver that have 
checked in with the Driver
    * in a restart situation.
    */
-  int getNumRecoveredContainers();
+  Set<String> getRecoveredEvaluatorIds();
 
   /**
    * Indicate that this Driver has re-established the connection with one more 
Evaluator of a previous run.
+   * @return true if the driver restart is completed.
    */
-  void oneContainerRecovered();
+  boolean evaluatorRecovered(final String id);
 
   /**
    * Records the evaluators when it is allocated. The implementation depends 
on the runtime.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1dc1cf3a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
index 0d7a75a..2b07027 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
@@ -21,8 +21,12 @@ package org.apache.reef.driver.restart;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.exception.DriverFatalRuntimeException;
 
 import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -36,17 +40,19 @@ import java.util.logging.Logger;
 public final class DriverRestartManagerImpl implements DriverRestartManager {
   private static final Logger LOG = 
Logger.getLogger(DriverRestartManagerImpl.class.getName());
   private final DriverRuntimeRestartManager driverRuntimeRestartManager;
+  private final Set<String> previousEvaluators;
+  private final Set<String> recoveredEvaluators;
 
+  private boolean restartBegan;
   private boolean restartCompleted;
-  private int numPreviousContainers;
-  private int numRecoveredContainers;
 
   @Inject
   private DriverRestartManagerImpl(final DriverRuntimeRestartManager 
driverRuntimeRestartManager) {
     this.driverRuntimeRestartManager = driverRuntimeRestartManager;
     this.restartCompleted = false;
-    this.numPreviousContainers = -1;
-    this.numRecoveredContainers = 0;
+    this.restartBegan = false;
+    this.previousEvaluators = new HashSet<>();
+    this.recoveredEvaluators = new HashSet<>();
   }
 
   @Override
@@ -57,46 +63,54 @@ public final class DriverRestartManagerImpl implements 
DriverRestartManager {
   @Override
   public void onRestart() {
     final EvaluatorRestartInfo evaluatorRestartInfo = 
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
-    setNumPreviousContainers(evaluatorRestartInfo.getAliveEvaluators().size());
+    setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
     
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
   }
 
   @Override
-  public synchronized void setRestartCompleted() {
-    if (this.restartCompleted) {
-      LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
-    } else {
-      this.restartCompleted = true;
-    }
+  public boolean isRestartCompleted() {
+    return this.restartCompleted;
   }
 
   @Override
-  public synchronized int getNumPreviousContainers() {
-    return this.numPreviousContainers;
+  public synchronized Set<String> getPreviousEvaluatorIds() {
+    return Collections.unmodifiableSet(this.previousEvaluators);
   }
 
   @Override
-  public synchronized void setNumPreviousContainers(final int num) {
-    if (this.numPreviousContainers >= 0) {
-      throw new IllegalStateException("Attempting to set the number of 
expected containers left " +
-          "from a previous container more than once.");
+  public synchronized void setPreviousEvaluatorIds(final Set<String> ids) {
+    if (!this.restartBegan) {
+      previousEvaluators.addAll(ids);
     } else {
-      this.numPreviousContainers = num;
+      final String errMsg = "Should not be setting the set of expected alive 
evaluators more than once.";
+      LOG.log(Level.SEVERE, errMsg);
+      throw new DriverFatalRuntimeException(errMsg);
     }
   }
 
   @Override
-  public synchronized int getNumRecoveredContainers() {
-    return this.numRecoveredContainers;
+  public synchronized Set<String> getRecoveredEvaluatorIds() {
+    return Collections.unmodifiableSet(this.previousEvaluators);
   }
 
   @Override
-  public synchronized void oneContainerRecovered() {
-    this.numRecoveredContainers += 1;
-    if (this.numRecoveredContainers > this.numPreviousContainers) {
-      throw new IllegalStateException("Reconnected to" +
-          this.numRecoveredContainers + "Evaluators while only expecting " + 
this.numPreviousContainers);
+  public synchronized boolean evaluatorRecovered(final String evaluatorId) {
+    if (!this.previousEvaluators.contains(evaluatorId)) {
+      final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " 
not expected to be alive.";
+      LOG.log(Level.SEVERE, errMsg);
+      throw new DriverFatalRuntimeException(errMsg);
+    }
+
+    if (!this.recoveredEvaluators.add(evaluatorId)) {
+      LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " 
added to the set" +
+          " of recovered evaluators more than once. Ignoring second add...");
     }
+
+    if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) {
+      this.restartCompleted = true;
+    }
+
+    return this.restartCompleted;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1dc1cf3a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index e73cfef..d546e59 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -397,24 +397,17 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
           this.evaluatorControlHandler.setRemoteID(evaluatorRID);
           this.stateManager.setRunning();
 
-          this.driverRestartManager.get().oneContainerRecovered();
-          final int numRecoveredContainers = 
this.driverRestartManager.get().getNumRecoveredContainers();
+          boolean restartCompleted = 
this.driverRestartManager.get().evaluatorRecovered(this.evaluatorId);
 
           LOG.log(Level.FINE, "Received recovery heartbeat from evaluator 
{0}.", this.evaluatorId);
-          final int expectedEvaluatorsNumber = 
this.driverRestartManager.get().getNumPreviousContainers();
-
-          if (numRecoveredContainers > expectedEvaluatorsNumber) {
-            LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, 
but [{1}] evaluators have checked in.",
-                new Object[]{expectedEvaluatorsNumber, 
numRecoveredContainers});
-            throw new RuntimeException("More then expected number of 
evaluators are checking in during recovery.");
-          } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
-            LOG.log(Level.INFO, "All [{0}] expected evaluators have checked 
in. Recovery completed.",
-                expectedEvaluatorsNumber);
-            this.driverRestartManager.get().setRestartCompleted();
+
+          if (restartCompleted) {
             this.messageDispatcher.onDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
+            LOG.log(Level.INFO, "All expected evaluators checked in.");
           } else {
-            LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] 
evaluators have checked in.",
-                new Object[]{expectedEvaluatorsNumber, 
numRecoveredContainers});
+            LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.",
+                new 
Object[]{this.driverRestartManager.get().getPreviousEvaluatorIds(),
+                    
this.driverRestartManager.get().getRecoveredEvaluatorIds()});
           }
         } else {
           final String errorMsg = "Restart configurations are not set 
properly. The DriverRestartManager is missing.";

Reply via email to