Repository: incubator-reef
Updated Branches:
  refs/heads/master 4b7288de8 -> 2d67da786


[REEF-616] Keep state of previous evaluators with a state machine

This switches to a state machine model for evaluator statuses for driver 
restart.

JIRA:
  [REEF-616](https://issues.apache.org/jira/browse/REEF-616)

Pull Request:
  This closes #396


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2d67da78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2d67da78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2d67da78

Branch: refs/heads/master
Commit: 2d67da786f4ec4b4dcb885ab471c78b8f91d8dbf
Parents: 4b7288d
Author: Andrew Chung <[email protected]>
Authored: Thu Aug 20 15:32:25 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 21 11:25:13 2015 -0700

----------------------------------------------------------------------
 .../DefaultDriverRuntimeRestartMangerImpl.java  |   2 +-
 .../driver/restart/DriverRestartManager.java    | 138 ++++++++++++++-----
 .../reef/driver/restart/DriverRestartState.java |   7 +
 .../driver/restart/DriverRestartUtilities.java  |  44 ------
 .../driver/restart/EvaluatorRestartState.java   | 106 ++++++++++++++
 .../driver/context/ContextRepresenters.java     |   4 +-
 .../evaluator/EvaluatorHeartbeatHandler.java    |  34 ++++-
 .../driver/evaluator/EvaluatorManager.java      |  55 +++++---
 .../common/driver/task/TaskRepresenter.java     |   7 +-
 9 files changed, 282 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
index 22714c5..12934f7 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
@@ -29,7 +29,7 @@ import java.util.Set;
 /**
  * The default driver runtime restart manager that is not able to perform any 
restart actions.
  * Thus, when performing actions pertaining to restart, it is recommended to 
call
- * {@link DriverRuntimeRestartManager#hasRestarted()} first or use static 
functions in {@link DriverRestartUtilities}.
+ * {@link DriverRuntimeRestartManager#hasRestarted()} first.
  */
 @Private
 @DriverSide

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 54a2c54..3e1be1f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -24,9 +24,7 @@ import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.exception.DriverFatalRuntimeException;
 
 import javax.inject.Inject;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -40,16 +38,12 @@ import java.util.logging.Logger;
 public final class DriverRestartManager {
   private static final Logger LOG = 
Logger.getLogger(DriverRestartManager.class.getName());
   private final DriverRuntimeRestartManager driverRuntimeRestartManager;
-  private final Set<String> previousEvaluators;
-  private final Set<String> recoveredEvaluators;
-  private DriverRestartState state;
+  private final Map<String, EvaluatorRestartState> previousEvaluators = new 
HashMap<>();
+  private DriverRestartState state = DriverRestartState.NotRestarted;
 
   @Inject
   private DriverRestartManager(final DriverRuntimeRestartManager 
driverRuntimeRestartManager) {
     this.driverRuntimeRestartManager = driverRuntimeRestartManager;
-    this.state = DriverRestartState.NotRestarted;
-    this.previousEvaluators = new HashSet<>();
-    this.recoveredEvaluators = new HashSet<>();
   }
 
   /**
@@ -58,7 +52,8 @@ public final class DriverRestartManager {
    * Can be already done with restart or in the process of restart.
    */
   public synchronized boolean detectRestart() {
-    if (!this.state.hasRestarted() && 
driverRuntimeRestartManager.hasRestarted()) {
+    if (this.state.hasNotRestarted() && 
driverRuntimeRestartManager.hasRestarted()) {
+      // set the state machine in motion.
       this.state = DriverRestartState.RestartBegan;
     }
 
@@ -89,31 +84,34 @@ public final class DriverRestartManager {
     final EvaluatorRestartInfo evaluatorRestartInfo = 
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
     setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
     
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
+    // TODO[REEF-560]: Call onDriverRestartCompleted() (to do in REEF-617) on 
a Timer.
   }
 
   /**
-   * @return whether restart is completed.
+   * @return The restart state of the specified evaluator. Returns {@link 
EvaluatorRestartState#NOT_EXPECTED}
+   * if the {@link DriverRestartManager} does not believe that it's an 
evaluator to be recovered.
    */
-  public synchronized boolean isRestartCompleted() {
-    return this.state == DriverRestartState.RestartCompleted;
-  }
+  public synchronized EvaluatorRestartState getEvaluatorRestartState(final 
String evaluatorId) {
+    if (this.state.hasNotRestarted() ||
+        !this.previousEvaluators.containsKey(evaluatorId)) {
+      return EvaluatorRestartState.NOT_EXPECTED;
+    }
 
-  /**
-   * @return the Evaluators expected to check in from a previous run.
-   */
-  public synchronized Set<String> getPreviousEvaluatorIds() {
-    return Collections.unmodifiableSet(this.previousEvaluators);
+    return this.previousEvaluators.get(evaluatorId);
   }
 
   /**
    * Set the Evaluators to expect still active from a previous execution of 
the Driver in a restart situation.
    * To be called exactly once during a driver restart.
    *
-   * @param ids the evaluator IDs of the evaluators that are expected to have 
survived driver restart.
+   * @param previousEvaluatorIds the evaluator IDs of the evaluators that are 
expected to have survived driver restart.
    */
-  public synchronized void setPreviousEvaluatorIds(final Set<String> ids) {
-    if (this.state != DriverRestartState.RestartInProgress) {
-      previousEvaluators.addAll(ids);
+  private synchronized void setPreviousEvaluatorIds(final Set<String> 
previousEvaluatorIds) {
+    if (this.state == DriverRestartState.RestartBegan) {
+      for (final String previousEvaluatorId : previousEvaluatorIds) {
+        setEvaluatorExpected(previousEvaluatorId);
+      }
+
       this.state = DriverRestartState.RestartInProgress;
     } else {
       final String errMsg = "Should not be setting the set of expected alive 
evaluators more than once.";
@@ -123,34 +121,26 @@ public final class DriverRestartManager {
   }
 
   /**
-   * @return the IDs of the Evaluators from a previous Driver that have 
checked in with the Driver
-   * in a restart situation.
-   */
-  public synchronized Set<String> getRecoveredEvaluatorIds() {
-    return Collections.unmodifiableSet(this.previousEvaluators);
-  }
-
-  /**
    * Indicate that this Driver has re-established the connection with one more 
Evaluator of a previous run.
+   * Calls the restart complete action if the latest evaluator is the last 
evaluator to recover.
    * @return true if the driver restart is completed.
    */
   public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String 
evaluatorId) {
-    if (!this.previousEvaluators.contains(evaluatorId)) {
+    if (!this.previousEvaluators.containsKey(evaluatorId) ||
+        this.previousEvaluators.get(evaluatorId) == 
EvaluatorRestartState.NOT_EXPECTED) {
       final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " 
not expected to be alive.";
       LOG.log(Level.SEVERE, errMsg);
       throw new DriverFatalRuntimeException(errMsg);
     }
 
-    if (!this.recoveredEvaluators.add(evaluatorId)) {
+    if (this.previousEvaluators.get(evaluatorId) != 
EvaluatorRestartState.EXPECTED) {
       LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " 
added to the set" +
           " of recovered evaluators more than once. Ignoring second add...");
+    } else {
+      setEvaluatorReported(evaluatorId);
     }
 
-    if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) {
-      this.state = DriverRestartState.RestartCompleted;
-    }
-
-    return this.state == DriverRestartState.RestartCompleted;
+    return haveAllExpectedEvaluatorsReported();
   }
 
   /**
@@ -168,4 +158,76 @@ public final class DriverRestartManager {
   public synchronized void recordRemovedEvaluator(final String id) {
     driverRuntimeRestartManager.recordRemovedEvaluator(id);
   }
+
+  /**
+   * Signals to the {@link DriverRestartManager} that an evaluator is to be 
expected to report back after restart.
+   */
+  public synchronized void setEvaluatorExpected(final String evaluatorId) {
+    if (previousEvaluators.containsKey(evaluatorId)) {
+      LOG.log(Level.WARNING, "Evaluator " + evaluatorId + " is already added 
to the set of previous evaluators with " +
+          "state [" + previousEvaluators.get(evaluatorId) + "]. Ignoring...");
+      return;
+    }
+
+    previousEvaluators.put(evaluatorId, EvaluatorRestartState.EXPECTED);
+  }
+
+  /**
+   * Signals to the {@link DriverRestartManager} that an evaluator has 
reported back after restart.
+   */
+  public synchronized void setEvaluatorReported(final String evaluatorId) {
+    setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REPORTED);
+  }
+
+  /**
+   * Signals to the {@link DriverRestartManager} that an evaluator has had its 
recovery heartbeat processed.
+   */
+  public synchronized void setEvaluatorReregistered(final String evaluatorId) {
+    setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REREGISTERED);
+  }
+
+  /**
+   * Signals to the {@link DriverRestartManager} that an evaluator has had its 
running task processed.
+   */
+  public synchronized void setEvaluatorRunningTask(final String evaluatorId) {
+    setPreviousEvaluatorState(
+        evaluatorId, EvaluatorRestartState.PROCESSED);
+  }
+
+  /**
+   * Signals to the {@link DriverRestartManager} that an expected evaluator 
has been expired.
+   */
+  public synchronized void setEvaluatorExpired(final String evaluatorId) {
+    setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.EXPIRED);
+  }
+
+  private synchronized void setPreviousEvaluatorState(final String evaluatorId,
+                                                      final 
EvaluatorRestartState to) {
+    if (!previousEvaluators.containsKey(evaluatorId) ||
+        
!EvaluatorRestartState.isLegalTransition(previousEvaluators.get(evaluatorId), 
to)) {
+      throw evaluatorTransitionFailed(evaluatorId, to);
+    }
+
+    previousEvaluators.put(evaluatorId, to);
+  }
+
+  private synchronized DriverFatalRuntimeException 
evaluatorTransitionFailed(final String evaluatorId,
+                                                                             
final EvaluatorRestartState to) {
+    if (!previousEvaluators.containsKey(evaluatorId)) {
+      return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is 
not expected.");
+    }
+
+    return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " 
wants to transition to state " +
+        "[" + to + "], but is in the illegal state [" + 
previousEvaluators.get(evaluatorId) + "].");
+  }
+
+  private synchronized boolean haveAllExpectedEvaluatorsReported() {
+    for (final EvaluatorRestartState evaluatorRestartState : 
this.previousEvaluators.values()) {
+      if (!evaluatorRestartState.hasReported()) {
+        return false;
+      }
+    }
+
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
index 5ffd741..a20ec4c 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
@@ -69,4 +69,11 @@ public enum DriverRestartState {
   public boolean hasRestarted() {
     return this != NotRestarted;
   }
+
+  /**
+   * the negation of {@link #hasRestarted()}.
+   */
+  public boolean hasNotRestarted() {
+    return !this.hasRestarted();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
deleted file mode 100644
index e8cdd33..0000000
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.driver.restart;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-
-/**
- * A static utilities class for simplifying calls to driver restart manager.
- */
-@Private
-@DriverSide
-@Unstable
-public final class DriverRestartUtilities {
-
-  /**
-   * Helper function for driver restart to determine whether an evaluator ID 
is from an evaluator from the
-   * previous application attempt.
-   */
-  public static boolean isRestartAndIsPreviousEvaluator(final 
DriverRestartManager driverRestartManager,
-                                                        final String 
evaluatorId) {
-    return driverRestartManager.hasRestarted() && 
driverRestartManager.getPreviousEvaluatorIds().contains(evaluatorId);
-  }
-
-  private DriverRestartUtilities() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
new file mode 100644
index 0000000..4a0c540
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * The state that the evaluator is in in the driver restart process.
+ */
+@Private
+@DriverSide
+@Unstable
+public enum EvaluatorRestartState {
+  /**
+   * The evaluator is not a restarted instance. Not expecting.
+   */
+  NOT_EXPECTED,
+
+  /**
+   * Have not yet heard back from an evaluator, but we are expecting it to 
report back.
+   */
+  EXPECTED,
+
+  /**
+   * Received the evaluator heartbeat, but have not yet processed it.
+   */
+  REPORTED,
+
+  /**
+   * The evaluator has had its recovery heartbeat processed.
+   */
+  REREGISTERED,
+
+  /**
+   * The evaluator has had its context/running task processed.
+   */
+  PROCESSED,
+
+  /**
+   * The evaluator has only contacted the driver after the expiration period.
+   */
+  EXPIRED;
+
+  /**
+   * @return true if the transition of {@link EvaluatorRestartState} is legal.
+   */
+  public static boolean isLegalTransition(final EvaluatorRestartState from, 
final EvaluatorRestartState to) {
+    switch(from) {
+    case EXPECTED:
+      switch(to) {
+      case REPORTED:
+        return true;
+      default:
+        return false;
+      }
+    case REPORTED:
+      switch(to) {
+      case REREGISTERED:
+        return true;
+      default:
+        return false;
+      }
+    case REREGISTERED:
+      switch(to) {
+      case PROCESSED:
+        return true;
+      default:
+        return false;
+      }
+    default:
+      return false;
+    }
+  }
+
+  /**
+   * @return true if the evaluator has heartbeated back to the driver.
+   */
+  public boolean hasReported() {
+    switch(this) {
+    case REPORTED:
+    case REREGISTERED:
+    case PROCESSED:
+      return true;
+    default:
+      return false;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index 41bdd43..8abc19d 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -24,7 +24,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.FailedContext;
 import org.apache.reef.driver.restart.DriverRestartManager;
-import org.apache.reef.driver.restart.DriverRestartUtilities;
+import org.apache.reef.driver.restart.EvaluatorRestartState;
 import org.apache.reef.proto.ReefServiceProtos;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
 import org.apache.reef.util.Optional;
@@ -215,7 +215,7 @@ public final class ContextRepresenters {
         Optional.of(contextStatusProto.getParentId()) : 
Optional.<String>empty();
     final EvaluatorContext context = contextFactory.newContext(contextID, 
parentID);
     this.addContext(context);
-    if 
(DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, 
context.getEvaluatorId())) {
+    if 
(driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId()) == 
EvaluatorRestartState.REREGISTERED) {
       // when we get a recovered active context, always notify application
       this.messageDispatcher.onDriverRestartContextActive(context);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
index 8225a0a..3d02ead 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -20,6 +20,8 @@ package org.apache.reef.runtime.common.driver.evaluator;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.driver.restart.EvaluatorRestartState;
 import org.apache.reef.proto.EvaluatorRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.util.Optional;
@@ -40,11 +42,15 @@ public final class EvaluatorHeartbeatHandler
   private static final Logger LOG = 
Logger.getLogger(EvaluatorHeartbeatHandler.class.getName());
   private final Evaluators evaluators;
   private final EvaluatorManagerFactory evaluatorManagerFactory;
+  private final DriverRestartManager driverRestartManager;
 
   @Inject
-  EvaluatorHeartbeatHandler(final Evaluators evaluators, final 
EvaluatorManagerFactory evaluatorManagerFactory) {
+  EvaluatorHeartbeatHandler(final Evaluators evaluators,
+                            final EvaluatorManagerFactory 
evaluatorManagerFactory,
+                            final DriverRestartManager driverRestartManager) {
     this.evaluators = evaluators;
     this.evaluatorManagerFactory = evaluatorManagerFactory;
+    this.driverRestartManager = driverRestartManager;
   }
 
   @Override
@@ -58,10 +64,25 @@ public final class EvaluatorHeartbeatHandler
         new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(),
             evaluatorHeartbeatMessage.getIdentifier()});
 
-    final Optional<EvaluatorManager> evaluatorManager = 
this.evaluators.get(evaluatorId);
-    if (evaluatorManager.isPresent()) {
-      
evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
-    } else {
+    try {
+      final Optional<EvaluatorManager> evaluatorManager = 
this.evaluators.get(evaluatorId);
+      if (evaluatorManager.isPresent()) {
+        
evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
+        return;
+      }
+
+      if (driverRestartManager.isRestarting() &&
+          driverRestartManager.getEvaluatorRestartState(evaluatorId) == 
EvaluatorRestartState.EXPECTED) {
+        // TODO[REEF-617]: Create EvaluatorManager for recovered evaluator and 
call onEvaluatorHeartbeatMessage().
+        return;
+      }
+
+      if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == 
EvaluatorRestartState.EXPIRED) {
+        LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has 
reported back to the driver after restart.");
+        // TODO[REEF-617]: Create EvaluatorManager for expired evaluator and 
close it.
+        return;
+      }
+
       final StringBuilder message = new StringBuilder("Contact from unknown 
Evaluator with identifier '");
       message.append(evaluatorId);
       if (heartbeat.hasEvaluatorStatus()) {
@@ -70,7 +91,8 @@ public final class EvaluatorHeartbeatHandler
       }
       message.append('\'');
       throw new RuntimeException(message.toString());
+    } finally {
+      LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
     }
-    LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 35dcd35..f4309c8 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -23,7 +23,7 @@ import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.evaluator.CLRProcessFactory;
 import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
 import org.apache.reef.driver.restart.DriverRestartManager;
-import org.apache.reef.driver.restart.DriverRestartUtilities;
+import org.apache.reef.driver.restart.EvaluatorRestartState;
 import org.apache.reef.tang.ConfigurationProvider;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.context.FailedContext;
@@ -323,6 +323,9 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     }
   }
 
+  /**
+   * Process an evaluator heartbeat message.
+   */
   public void onEvaluatorHeartbeatMessage(
       final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> 
evaluatorHeartbeatProtoRemoteMessage) {
 
@@ -340,31 +343,42 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       this.sanityChecker.check(evaluatorId, 
evaluatorHeartbeatProto.getTimestamp());
       final String evaluatorRID = 
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
 
-      // first message from a running evaluator trying to re-establish 
communications
-      if 
(DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, 
evaluatorId)) {
-        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
-        this.stateManager.setRunning();
+      final EvaluatorRestartState evaluatorRestartState = 
driverRestartManager.getEvaluatorRestartState(evaluatorId);
 
-        final boolean restartCompleted =
-            
this.driverRestartManager.onRecoverEvaluatorIsRestartComplete(this.evaluatorId);
+      /*
+       * First message from a running evaluator. The evaluator can be a new 
evaluator or be a previous evaluator
+       * from a separate application attempt. In the case of a previous 
evaluator, if the restart period has not
+       * yet expired, we should register it and trigger context active and 
task events. If the restart period has
+       * expired, we should return immediately after setting its remote ID in 
order to close it.
+       */
+      if (this.stateManager.isSubmitted() ||
+          evaluatorRestartState == EvaluatorRestartState.REPORTED ||
+          evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
 
-        LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", 
this.evaluatorId);
+        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
 
-        if (restartCompleted) {
-          this.messageDispatcher.onDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
-          LOG.log(Level.INFO, "All expected evaluators checked in.");
-        } else {
-          LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.",
-              new Object[]{this.driverRestartManager.getPreviousEvaluatorIds(),
-                  this.driverRestartManager.getRecoveredEvaluatorIds()});
+        if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
+          // Don't do anything if evaluator has expired. Close it immediately 
upon exit of this method.
+          return;
         }
-      }
 
-      // If this is the first message from this Evaluator, register it.
-      if (this.stateManager.isSubmitted()) {
-        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
         this.stateManager.setRunning();
         LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
+
+        if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
+
+          // TODO[REEF-617]: Move evaluator recovery to 
EvaluatorHeartbeatHandler and reregister evaluator here.
+          final boolean restartCompleted =
+              
this.driverRestartManager.onRecoverEvaluatorIsRestartComplete(this.evaluatorId);
+
+          LOG.log(Level.FINE, "Received recovery heartbeat from evaluator 
{0}.", this.evaluatorId);
+
+          // TODO[REEF-617]: Move restart completion logic to 
DriverRestartManager.
+          if (restartCompleted) {
+            this.messageDispatcher.onDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
+            LOG.log(Level.INFO, "All expected evaluators checked in.");
+          }
+        }
       }
 
       // Process the Evaluator status message
@@ -491,8 +505,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       if (taskStatusProto.getState() == ReefServiceProtos.State.INIT ||
           taskStatusProto.getState() == ReefServiceProtos.State.FAILED ||
           taskStatusProto.getState() == ReefServiceProtos.State.RUNNING ||
-          // for task from recovered evaluators
-          
DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, 
evaluatorId)) {
+          driverRestartManager.getEvaluatorRestartState(evaluatorId) == 
EvaluatorRestartState.REREGISTERED) {
 
         // [REEF-308] exposes a bug where the .NET evaluator does not send its 
states in the right order
         // [REEF-289] is a related item which may fix the issue

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 5436494..a09532b 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -22,7 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.restart.DriverRestartManager;
-import org.apache.reef.driver.restart.DriverRestartUtilities;
+import org.apache.reef.driver.restart.EvaluatorRestartState;
 import org.apache.reef.driver.task.FailedTask;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.proto.ReefServiceProtos;
@@ -88,7 +88,7 @@ public final class TaskRepresenter {
       throw new RuntimeException("Received a message for task " + 
taskStatusProto.getTaskId() +
           " in the TaskRepresenter for Task " + this.taskId);
     }
-    if (taskStatusProto.getRecovery()) {
+    if 
(driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == 
EvaluatorRestartState.REREGISTERED) {
       // when a recovered heartbeat is received, we will take its word for it
       LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.",
           new Object[]{taskStatusProto.getState(), this.taskId});
@@ -139,9 +139,10 @@ public final class TaskRepresenter {
     }
 
     // fire driver restart task running handler if this is a recovery heartbeat
-    if 
(DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, 
evaluatorManager.getId())) {
+    if 
(driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == 
EvaluatorRestartState.REREGISTERED) {
       final RunningTask runningTask = new RunningTaskImpl(
           this.evaluatorManager, this.taskId, this.context, this);
+      
this.driverRestartManager.setEvaluatorRunningTask(evaluatorManager.getId());
       this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
     }
 

Reply via email to