Repository: incubator-reef
Updated Branches:
  refs/heads/master a7b655790 -> 2a09c826a


[REEF-561] Move restart functions from DriverStatusManager to 
DriverRestartManager

This addressed the issue by
  * Splitting the original `DriverRestartManager` into
    `DriverRestartManager` and `DriverRuntimeRestartManager`.
  * The implementation `DriverRestartManagerImpl` is responsible for
    non-runtime dependent code, while the interface
    `DriverRuntimeRestartManager` is responsible for runtime-dependent
    restart methods.
  * Moving restart related functionality from the `DriverStatusManager`
    to D`riverRestartManager` and `DriverRuntimeRestartManager`.
  * Additionally corrected the behavior to `RuntimeClock` on failure to
    propagate the exception.

JIRA:
  [REEF-561](https://issues.apache.org/jira/browse/REEF-561)

Pull Request:
  This closes #340


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2a09c826
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2a09c826
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2a09c826

Branch: refs/heads/master
Commit: 2a09c826a06ac24ee85b99f3267150e543e65b46
Parents: a7b6557
Author: Andrew Chung <[email protected]>
Authored: Wed Aug 5 14:49:25 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Aug 6 15:20:37 2015 -0700

----------------------------------------------------------------------
 .../driver/restart/DriverRestartManager.java    |  54 +++--
 .../restart/DriverRestartManagerImpl.java       | 111 +++++++++
 .../restart/DriverRuntimeRestartManager.java    |  66 +++++
 ...atorPreservingEvaluatorAllocatedHandler.java |   4 +-
 .../driver/restart/EvaluatorRestartInfo.java    |  56 +++++
 .../DriverRuntimeRestartConfiguration.java      |   5 +-
 .../common/driver/DriverStartHandler.java       |  13 +-
 .../common/driver/DriverStatusManager.java      |  67 ------
 .../driver/evaluator/EvaluatorManager.java      | 112 +++++++--
 .../driver/YarnDriverRestartConfiguration.java  |   4 +-
 .../yarn/driver/YarnDriverRestartManager.java   | 238 -------------------
 .../driver/YarnDriverRuntimeRestartManager.java | 231 ++++++++++++++++++
 .../reef/wake/time/runtime/RuntimeClock.java    |   6 +-
 13 files changed, 605 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index c60740f..99bd7b0 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -21,43 +21,67 @@ package org.apache.reef.driver.restart;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.annotations.audience.RuntimeAuthor;
 
 /**
- * Classes implementing this interface are in charge of recording evaluator
- * changes as they are allocated as well as recovering Evaluators and
- * discovering which evaluators are lost on the event of a driver restart.
+ * The manager that handles aspects of driver restart such as determining 
whether the driver is in
+ * restart mode, what to do on restart, whether restart is completed, and 
others.
  */
 @DriverSide
 @Private
-@RuntimeAuthor
 @Unstable
 public interface DriverRestartManager {
 
   /**
-   * Determines whether or not the driver has been restarted.
+   * @return Whether or not the driver instance is a restarted instance.
    */
   boolean isRestart();
 
   /**
-   * This function has a few jobs crucial jobs to enable restart:
-   * 1. Recover the list of evaluators that are reported to be alive by the 
Resource Manager.
-   * 2. Make necessary operations to inform relevant runtime components about 
evaluators that are alive
-   * with the set of evaluator IDs recovered in step 1.
-   * 3. Make necessary operations to inform relevant runtime components about 
evaluators that have failed
-   * during the driver restart period.
+   * Recovers the list of alive and failed evaluators and inform about 
evaluator failures
+   * based on the specific runtime. Also sets the expected amount of 
evaluators to report back
+   * as alive to the job driver.
    */
   void onRestart();
 
   /**
-   * Records the evaluators when it is allocated.
+   * Indicate that the Driver restart is complete. It is meant to be called 
exactly once during a restart and never
+   * during the ininital launch of a Driver.
+   */
+  void setRestartCompleted();
+
+  /**
+   * @return the number of Evaluators expected to check in from a previous run.
+   */
+  int getNumPreviousContainers();
+
+
+  /**
+   * Set the number of containers to expect still active from a previous 
execution of the Driver in a restart situation.
+   * To be called exactly once during a driver restart.
+   *
+   * @param num
+   */
+  void setNumPreviousContainers(final int num);
+
+  /**
+   * @return the number of Evaluators from a previous Driver that have checked 
in with the Driver
+   * in a restart situation.
+   */
+  int getNumRecoveredContainers();
+
+  /**
+   * Indicate that this Driver has re-established the connection with one more 
Evaluator of a previous run.
+   */
+  void oneContainerRecovered();
+
+  /**
+   * Records the evaluators when it is allocated. The implementation depends 
on the runtime.
    * @param id The evaluator ID of the allocated evaluator.
    */
   void recordAllocatedEvaluator(final String id);
 
-
   /**
-   * Records a removed evaluator into the evaluator log.
+   * Records a removed evaluator into the evaluator log. The implementation 
depends on the runtime.
    * @param id The evaluator ID of the removed evaluator.
    */
   void recordRemovedEvaluator(final String id);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
new file mode 100644
index 0000000..0d7a75a
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The implementation of DriverRestartManager. A few methods here are proxy 
methods for
+ * the DriverRuntimeRestartManager that depends on the runtime implementation.
+ */
+@DriverSide
+@Private
+@Unstable
+public final class DriverRestartManagerImpl implements DriverRestartManager {
+  private static final Logger LOG = 
Logger.getLogger(DriverRestartManagerImpl.class.getName());
+  private final DriverRuntimeRestartManager driverRuntimeRestartManager;
+
+  private boolean restartCompleted;
+  private int numPreviousContainers;
+  private int numRecoveredContainers;
+
+  @Inject
+  private DriverRestartManagerImpl(final DriverRuntimeRestartManager 
driverRuntimeRestartManager) {
+    this.driverRuntimeRestartManager = driverRuntimeRestartManager;
+    this.restartCompleted = false;
+    this.numPreviousContainers = -1;
+    this.numRecoveredContainers = 0;
+  }
+
+  @Override
+  public boolean isRestart() {
+    return driverRuntimeRestartManager.isRestart();
+  }
+
+  @Override
+  public void onRestart() {
+    final EvaluatorRestartInfo evaluatorRestartInfo = 
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
+    setNumPreviousContainers(evaluatorRestartInfo.getAliveEvaluators().size());
+    
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
+  }
+
+  @Override
+  public synchronized void setRestartCompleted() {
+    if (this.restartCompleted) {
+      LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
+    } else {
+      this.restartCompleted = true;
+    }
+  }
+
+  @Override
+  public synchronized int getNumPreviousContainers() {
+    return this.numPreviousContainers;
+  }
+
+  @Override
+  public synchronized void setNumPreviousContainers(final int num) {
+    if (this.numPreviousContainers >= 0) {
+      throw new IllegalStateException("Attempting to set the number of 
expected containers left " +
+          "from a previous container more than once.");
+    } else {
+      this.numPreviousContainers = num;
+    }
+  }
+
+  @Override
+  public synchronized int getNumRecoveredContainers() {
+    return this.numRecoveredContainers;
+  }
+
+  @Override
+  public synchronized void oneContainerRecovered() {
+    this.numRecoveredContainers += 1;
+    if (this.numRecoveredContainers > this.numPreviousContainers) {
+      throw new IllegalStateException("Reconnected to" +
+          this.numRecoveredContainers + "Evaluators while only expecting " + 
this.numPreviousContainers);
+    }
+  }
+
+  @Override
+  public void recordAllocatedEvaluator(final String id) {
+    driverRuntimeRestartManager.recordAllocatedEvaluator(id);
+  }
+
+  @Override
+  public void recordRemovedEvaluator(final String id) {
+    driverRuntimeRestartManager.recordRemovedEvaluator(id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
new file mode 100644
index 0000000..c581e93
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+import java.util.Set;
+
+/**
+ * Classes implementing this interface are in charge of recording evaluator
+ * changes as they are allocated as well as recovering Evaluators and
+ * discovering which evaluators are lost on the event of a driver restart.
+ */
+@DriverSide
+@Private
+@RuntimeAuthor
+@Unstable
+public interface DriverRuntimeRestartManager {
+  /**
+   * Determines whether or not the driver has been restarted.
+   */
+  boolean isRestart();
+
+  /**
+   * Records the evaluators when it is allocated.
+   * @param id The evaluator ID of the allocated evaluator.
+   */
+  void recordAllocatedEvaluator(final String id);
+
+  /**
+   * Records a removed evaluator into the evaluator log.
+   * @param id The evaluator ID of the removed evaluator.
+   */
+  void recordRemovedEvaluator(final String id);
+
+  /**
+   * Gets the sets of alive and failed evaluators based on the runtime 
implementation.
+   * @return EvaluatorRestartInfo, which encapsulates the alive and failed set 
of evaluator IDs.
+   */
+  EvaluatorRestartInfo getAliveAndFailedEvaluators();
+
+  /**
+   * Informs the necessary components about failed evaluators. The 
implementation is runtime dependent.
+   * @param failedEvaluatorIds The set of evaluator IDs of evaluators that 
failed during restart.
+   */
+  void informAboutEvaluatorFailures(final Set<String> failedEvaluatorIds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
index ca9321f..e30d695 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
@@ -24,7 +24,7 @@ import org.apache.reef.wake.EventHandler;
 import javax.inject.Inject;
 
 /**
- * Records allocated evaluators for recovery on driver restart by using a 
DriverRestartManager.
+ * Records allocated evaluators for recovery on driver restart by using a 
DriverRuntimeRestartManager.
  */
 public final class EvaluatorPreservingEvaluatorAllocatedHandler implements 
EventHandler<AllocatedEvaluator> {
   private final DriverRestartManager driverRestartManager;
@@ -35,7 +35,7 @@ public final class 
EvaluatorPreservingEvaluatorAllocatedHandler implements Event
   }
 
   /**
-   * Records the allocatedEvaluator ID with the DriverRestartManager.
+   * Records the allocatedEvaluator ID with the DriverRuntimeRestartManager.
    * @param value the allocated evaluator event.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
new file mode 100644
index 0000000..10beb78
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * The encapsulating class for alive and failed evaluators on driver restart.
+ */
+@Private
+@DriverSide
+@Unstable
+public final class EvaluatorRestartInfo {
+  private final Set<String> aliveEvaluators;
+  private final Set<String> failedEvaluators;
+
+  public EvaluatorRestartInfo(final Set<String> aliveEvaluators, final 
Set<String> failedEvaluators) {
+    this.aliveEvaluators = Collections.unmodifiableSet(aliveEvaluators);
+    this.failedEvaluators = Collections.unmodifiableSet(failedEvaluators);
+  }
+
+  /**
+   * @return the set of evaluator IDs for alive evaluators on driver restart. 
The returned set is unmodifiable.
+   */
+  public Set<String> getAliveEvaluators() {
+    return this.aliveEvaluators;
+  }
+
+  /**
+   * @return the set of evaluator IDs for faiuled evaluators on driver 
restart. The returned set is unmodifiable.
+   */
+  public Set<String> getFailedEvaluators() {
+    return this.failedEvaluators;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index 70d0c9b..695ac8a 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -23,9 +23,7 @@ import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.parameters.ServiceEvaluatorAllocatedHandlers;
 import org.apache.reef.driver.parameters.ServiceEvaluatorCompletedHandlers;
 import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
-import 
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorAllocatedHandler;
-import 
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorCompletedHandler;
-import 
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorFailedHandler;
+import org.apache.reef.driver.restart.*;
 import org.apache.reef.tang.formats.*;
 
 /**
@@ -40,6 +38,7 @@ public final class DriverRuntimeRestartConfiguration extends 
ConfigurationModule
   }
 
   public static final ConfigurationModule CONF = new 
DriverRuntimeRestartConfiguration()
+      .bindImplementation(DriverRestartManager.class, 
DriverRestartManagerImpl.class)
       .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, 
EvaluatorPreservingEvaluatorAllocatedHandler.class)
       .bindSetEntry(ServiceEvaluatorFailedHandlers.class, 
EvaluatorPreservingEvaluatorFailedHandler.class)
       .bindSetEntry(ServiceEvaluatorCompletedHandlers.class, 
EvaluatorPreservingEvaluatorCompletedHandler.class)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index bfd52a3..2811497 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -50,10 +50,9 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
                      final Set<EventHandler<StartTime>> restartHandlers,
                      @Parameter(ServiceDriverRestartedHandlers.class)
                      final Set<EventHandler<StartTime>> serviceRestartHandlers,
-                     final DriverRestartManager driverRestartManager,
-                     final DriverStatusManager driverStatusManager) {
+                     final DriverRestartManager driverRestartManager) {
     this(startHandler, Optional.of(restartHandlers), 
Optional.of(serviceRestartHandlers),
-        Optional.of(driverRestartManager), driverStatusManager);
+        Optional.of(driverRestartManager));
     LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers 
[{0}], RestartHandlers [{1}]," +
             "and ServiceRestartHandlers [{2}], with a restart manager.",
         new String[] {this.startHandlers.toString(), 
this.restartHandlers.toString(),
@@ -62,10 +61,9 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
 
   @Inject
   
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
-                     final Set<EventHandler<StartTime>> startHandlers,
-                     final DriverStatusManager driverStatusManager) {
+                     final Set<EventHandler<StartTime>> startHandlers) {
     this(startHandlers, Optional.<Set<EventHandler<StartTime>>>empty(),
-        Optional.<Set<EventHandler<StartTime>>>empty(), 
Optional.<DriverRestartManager>empty(), driverStatusManager);
+        Optional.<Set<EventHandler<StartTime>>>empty(), 
Optional.<DriverRestartManager>empty());
     LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers 
[{0}] and no restart.",
         this.startHandlers.toString());
   }
@@ -73,8 +71,7 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
   private DriverStartHandler(final Set<EventHandler<StartTime>> startHandler,
                              final Optional<Set<EventHandler<StartTime>>> 
restartHandlers,
                              final Optional<Set<EventHandler<StartTime>>> 
serviceRestartHandlers,
-                             final Optional<DriverRestartManager> 
driverRestartManager,
-                             final DriverStatusManager driverStatusManager) {
+                             final Optional<DriverRestartManager> 
driverRestartManager) {
     this.startHandlers = startHandler;
     this.restartHandlers = restartHandlers;
     this.serviceRestartHandlers = serviceRestartHandlers;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
index 0c913de..4fbf618 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -43,9 +43,6 @@ public final class DriverStatusManager {
   private DriverStatus driverStatus = DriverStatus.PRE_INIT;
   private Optional<Throwable> shutdownCause = Optional.empty();
   private boolean driverTerminationHasBeenCommunicatedToClient = false;
-  private boolean restartCompleted = false;
-  private int numPreviousContainers = -1;
-  private int numRecoveredContainers = 0;
 
 
   /**
@@ -204,70 +201,6 @@ public final class DriverStatusManager {
     }
   }
 
-  /**
-   * Indicate that the Driver restart is complete. It is meant to be called 
exactly once during a restart and never
-   * during the ininital launch of a Driver.
-   */
-  public synchronized void setRestartCompleted() {
-    if (!this.isDriverRestart()) {
-      throw new IllegalStateException("setRestartCompleted() called in a 
Driver that is not, in fact, restarted.");
-    } else if (this.restartCompleted) {
-      LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
-    } else {
-      this.restartCompleted = true;
-    }
-  }
-
-  /**
-   * @return the number of Evaluators expected to check in from a previous run.
-   */
-  public synchronized int getNumPreviousContainers() {
-    return this.numPreviousContainers;
-  }
-
-  /**
-   * Set the number of containers to expect still active from a previous 
execution of the Driver in a restart situation.
-   * To be called exactly once during a driver restart.
-   *
-   * @param num
-   */
-  public synchronized void setNumPreviousContainers(final int num) {
-    if (this.numPreviousContainers >= 0) {
-      throw new IllegalStateException("Attempting to set the number of 
expected containers left " +
-          "from a previous container more than once.");
-    } else {
-      this.numPreviousContainers = num;
-    }
-  }
-
-  /**
-   * @return the number of Evaluators from a previous Driver that have checked 
in with the Driver
-   * in a restart situation.
-   */
-  public synchronized int getNumRecoveredContainers() {
-    return this.numRecoveredContainers;
-  }
-
-  /**
-   * Indicate that this Driver has re-established the connection with one more 
Evaluator of a previous run.
-   */
-  public synchronized void oneContainerRecovered() {
-    this.numRecoveredContainers += 1;
-    if (this.numRecoveredContainers > this.numPreviousContainers) {
-      throw new IllegalStateException("Reconnected to" +
-          this.numRecoveredContainers +
-          "Evaluators while only expecting " +
-          this.numPreviousContainers);
-    }
-  }
-
-  /**
-   * @return true if the Driver is a restarted driver of an earlier attempt.
-   */
-  private synchronized boolean isDriverRestart() {
-    return this.getNumPreviousContainers() > 0;
-  }
-
   public synchronized boolean isShuttingDownOrFailing() {
     return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
         || DriverStatus.FAILING.equals(this.driverStatus);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 91e9e1d..e73cfef 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -21,6 +21,8 @@ package org.apache.reef.runtime.common.driver.evaluator;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.evaluator.CLRProcessFactory;
+import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.tang.ConfigurationProvider;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.context.FailedContext;
@@ -35,7 +37,6 @@ import org.apache.reef.io.naming.Identifiable;
 import org.apache.reef.proto.EvaluatorRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.DriverRestartCompleted;
-import org.apache.reef.runtime.common.driver.DriverStatusManager;
 import org.apache.reef.driver.evaluator.EvaluatorProcess;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
@@ -96,7 +97,6 @@ public final class EvaluatorManager implements Identifiable, 
AutoCloseable {
   private final ContextControlHandler contextControlHandler;
   private final EvaluatorStatusManager stateManager;
   private final ExceptionCodec exceptionCodec;
-  private final DriverStatusManager driverStatusManager;
   private final EventHandlerIdlenessSource idlenessSource;
   private final RemoteManager remoteManager;
   private final ConfigurationSerializer configurationSerializer;
@@ -104,6 +104,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
   private final JVMProcessFactory jvmProcessFactory;
   private final CLRProcessFactory clrProcessFactory;
+  private final Optional<DriverRestartManager> driverRestartManager;
 
   // Mutable fields
   private Optional<TaskRepresenter> task = Optional.empty();
@@ -124,15 +125,68 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       final EvaluatorControlHandler evaluatorControlHandler,
       final ContextControlHandler contextControlHandler,
       final EvaluatorStatusManager stateManager,
-      final DriverStatusManager driverStatusManager,
       final ExceptionCodec exceptionCodec,
       final EventHandlerIdlenessSource idlenessSource,
       final LoggingScopeFactory loggingScopeFactory,
       @Parameter(EvaluatorConfigurationProviders.class)
       final Set<ConfigurationProvider> evaluatorConfigurationProviders,
-      // TODO: Eventually remove the factories when they are removed from 
AllocatedEvaluatorImpl
+      final JVMProcessFactory jvmProcessFactory,
+      final CLRProcessFactory clrProcessFactory,
+      final DriverRestartManager driverRestartManager) {
+    this(clock, remoteManager, resourceReleaseHandler, resourceLaunchHandler, 
evaluatorId, evaluatorDescriptor,
+        contextRepresenters, configurationSerializer, messageDispatcher, 
evaluatorControlHandler,
+        contextControlHandler, stateManager, exceptionCodec, idlenessSource, 
loggingScopeFactory,
+        evaluatorConfigurationProviders, jvmProcessFactory, clrProcessFactory, 
Optional.of(driverRestartManager));
+  }
+
+  @Inject
+  private EvaluatorManager(
+      final Clock clock,
+      final RemoteManager remoteManager,
+      final ResourceReleaseHandler resourceReleaseHandler,
+      final ResourceLaunchHandler resourceLaunchHandler,
+      @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
+      @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl 
evaluatorDescriptor,
+      final ContextRepresenters contextRepresenters,
+      final ConfigurationSerializer configurationSerializer,
+      final EvaluatorMessageDispatcher messageDispatcher,
+      final EvaluatorControlHandler evaluatorControlHandler,
+      final ContextControlHandler contextControlHandler,
+      final EvaluatorStatusManager stateManager,
+      final ExceptionCodec exceptionCodec,
+      final EventHandlerIdlenessSource idlenessSource,
+      final LoggingScopeFactory loggingScopeFactory,
+      @Parameter(EvaluatorConfigurationProviders.class)
+      final Set<ConfigurationProvider> evaluatorConfigurationProviders,
       final JVMProcessFactory jvmProcessFactory,
       final CLRProcessFactory clrProcessFactory) {
+    this(clock, remoteManager, resourceReleaseHandler, resourceLaunchHandler, 
evaluatorId, evaluatorDescriptor,
+        contextRepresenters, configurationSerializer, messageDispatcher, 
evaluatorControlHandler,
+        contextControlHandler, stateManager, exceptionCodec, idlenessSource, 
loggingScopeFactory,
+        evaluatorConfigurationProviders, jvmProcessFactory, clrProcessFactory, 
Optional.<DriverRestartManager>empty());
+  }
+
+  private EvaluatorManager(
+      final Clock clock,
+      final RemoteManager remoteManager,
+      final ResourceReleaseHandler resourceReleaseHandler,
+      final ResourceLaunchHandler resourceLaunchHandler,
+      final String evaluatorId,
+      final EvaluatorDescriptorImpl evaluatorDescriptor,
+      final ContextRepresenters contextRepresenters,
+      final ConfigurationSerializer configurationSerializer,
+      final EvaluatorMessageDispatcher messageDispatcher,
+      final EvaluatorControlHandler evaluatorControlHandler,
+      final ContextControlHandler contextControlHandler,
+      final EvaluatorStatusManager stateManager,
+      final ExceptionCodec exceptionCodec,
+      final EventHandlerIdlenessSource idlenessSource,
+      final LoggingScopeFactory loggingScopeFactory,
+      final Set<ConfigurationProvider> evaluatorConfigurationProviders,
+      // TODO: Eventually remove the factories when they are removed from 
AllocatedEvaluatorImpl
+      final JVMProcessFactory jvmProcessFactory,
+      final CLRProcessFactory clrProcessFactory,
+      final Optional<DriverRestartManager> driverRestartManager) {
     this.contextRepresenters = contextRepresenters;
     this.idlenessSource = idlenessSource;
     LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: 
{0}", evaluatorId);
@@ -146,7 +200,6 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     this.evaluatorControlHandler = evaluatorControlHandler;
     this.contextControlHandler = contextControlHandler;
     this.stateManager = stateManager;
-    this.driverStatusManager = driverStatusManager;
     this.exceptionCodec = exceptionCodec;
 
     this.remoteManager = remoteManager;
@@ -155,6 +208,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
     this.jvmProcessFactory = jvmProcessFactory;
     this.clrProcessFactory = clrProcessFactory;
+    this.driverRestartManager = driverRestartManager;
 
     LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: 
[{0}]", this.getId());
   }
@@ -195,7 +249,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
       allocationFired = true;
     } else {
-      LOG.log(Level.WARNING, "Evaluator allocated event fired twice.");
+      LOG.log(Level.WARNING, "Evaluator allocated event fired more than 
once.");
     }
   }
 
@@ -339,27 +393,33 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
       // first message from a running evaluator trying to re-establish 
communications
       if (evaluatorHeartbeatProto.getRecovery()) {
-        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
-        this.stateManager.setRunning();
-
-        this.driverStatusManager.oneContainerRecovered();
-        final int numRecoveredContainers = 
this.driverStatusManager.getNumRecoveredContainers();
-
-        LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", 
this.evaluatorId);
-        final int expectedEvaluatorsNumber = 
this.driverStatusManager.getNumPreviousContainers();
-
-        if (numRecoveredContainers > expectedEvaluatorsNumber) {
-          LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, 
but [{1}] evaluators have checked in.",
-              new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
-          throw new RuntimeException("More then expected number of evaluators 
are checking in during recovery.");
-        } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
-          LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. 
Recovery completed.",
-              expectedEvaluatorsNumber);
-          this.driverStatusManager.setRestartCompleted();
-          this.messageDispatcher.onDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
+        if(this.driverRestartManager.isPresent()) {
+          this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+          this.stateManager.setRunning();
+
+          this.driverRestartManager.get().oneContainerRecovered();
+          final int numRecoveredContainers = 
this.driverRestartManager.get().getNumRecoveredContainers();
+
+          LOG.log(Level.FINE, "Received recovery heartbeat from evaluator 
{0}.", this.evaluatorId);
+          final int expectedEvaluatorsNumber = 
this.driverRestartManager.get().getNumPreviousContainers();
+
+          if (numRecoveredContainers > expectedEvaluatorsNumber) {
+            LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, 
but [{1}] evaluators have checked in.",
+                new Object[]{expectedEvaluatorsNumber, 
numRecoveredContainers});
+            throw new RuntimeException("More then expected number of 
evaluators are checking in during recovery.");
+          } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
+            LOG.log(Level.INFO, "All [{0}] expected evaluators have checked 
in. Recovery completed.",
+                expectedEvaluatorsNumber);
+            this.driverRestartManager.get().setRestartCompleted();
+            this.messageDispatcher.onDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
+          } else {
+            LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] 
evaluators have checked in.",
+                new Object[]{expectedEvaluatorsNumber, 
numRecoveredContainers});
+          }
         } else {
-          LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] 
evaluators have checked in.",
-              new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+          final String errorMsg = "Restart configurations are not set 
properly. The DriverRestartManager is missing.";
+          LOG.log(Level.SEVERE, errorMsg);
+          throw new DriverFatalRuntimeException(errorMsg);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
index 5f10bf8..1b563a2 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.reef.annotations.Provided;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
-import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
 import org.apache.reef.runtime.common.driver.DriverRuntimeRestartConfiguration;
 import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
@@ -49,7 +49,7 @@ public final class YarnDriverRestartConfiguration extends 
ConfigurationModuleBui
    */
   public static final ConfigurationModule CONF = new 
YarnDriverRestartConfiguration()
       .bindNamedParameter(YarnEvaluatorPreserver.class, EVALUATOR_PRESERVER)
-      .bindImplementation(DriverRestartManager.class, 
YarnDriverRestartManager.class)
+      .bindImplementation(DriverRuntimeRestartManager.class, 
YarnDriverRuntimeRestartManager.class)
       .merge(DriverRuntimeRestartConfiguration.CONF)
       .build();
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
deleted file mode 100644
index 43920e2..0000000
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.yarn.driver;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.annotations.audience.RuntimeAuthor;
-import org.apache.reef.driver.restart.DriverRestartManager;
-import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.runtime.common.driver.DriverStatusManager;
-import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
-import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
-import org.apache.reef.tang.annotations.Parameter;
-
-import javax.inject.Inject;
-import java.util.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * The implementation of restart manager for YARN. Handles evaluator 
preservation as well
- * as evaluator recovery on YARN.
- */
-@DriverSide
-@RuntimeAuthor
-@Private
-@Unstable
-public final class YarnDriverRestartManager implements DriverRestartManager {
-
-  private static final Logger LOG = 
Logger.getLogger(YarnDriverRestartManager.class.getName());
-
-  private final EvaluatorPreserver evaluatorPreserver;
-  private final ApplicationMasterRegistration registration;
-  private final DriverStatusManager driverStatusManager;
-  private final REEFEventHandlers reefEventHandlers;
-  private Set<Container> previousContainers;
-
-  @Inject
-  private YarnDriverRestartManager(@Parameter(YarnEvaluatorPreserver.class)
-                           final EvaluatorPreserver evaluatorPreserver,
-                           final REEFEventHandlers reefEventHandlers,
-                           final ApplicationMasterRegistration registration,
-                           final DriverStatusManager driverStatusManager){
-    this.registration = registration;
-    this.evaluatorPreserver = evaluatorPreserver;
-    this.driverStatusManager = driverStatusManager;
-    this.reefEventHandlers = reefEventHandlers;
-    this.previousContainers = null;
-  }
-
-  /**
-   * Determines whether the application master has been restarted based on the 
container ID environment
-   * variable provided by YARN. If that fails, determine whether the 
application master is a restart
-   * based on the number of previous containers reported by YARN.
-   * @return true if the application master is a restarted instance, false 
otherwise.
-   */
-  @Override
-  public boolean isRestart() {
-    final String containerIdString = getContainerIdString();
-
-    if (containerIdString == null) {
-      // container id should always be set in the env by the framework
-      LOG.log(Level.WARNING, "Container ID is null, determining restart based 
on previous containers.");
-      return this.isRestartByPreviousContainers();
-    }
-
-    final ApplicationAttemptId appAttemptID = 
getAppAttemptId(containerIdString);
-
-    if (appAttemptID == null) {
-      LOG.log(Level.WARNING, "applicationAttempt ID is null, determining 
restart based on previous containers.");
-      return this.isRestartByPreviousContainers();
-    }
-
-    LOG.log(Level.FINE, "Application attempt: " + appAttemptID.getAttemptId());
-
-    return appAttemptID.getAttemptId() > 1;
-  }
-
-  private static String getContainerIdString() {
-    try {
-      return 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key());
-    } catch (Exception e) {
-      LOG.log(Level.WARNING, "Unable to get the container ID from the 
environment, exception " +
-          e + " was thrown.");
-      return null;
-    }
-  }
-
-  private static ApplicationAttemptId getAppAttemptId(final String 
containerIdString) {
-    try {
-      final ContainerId containerId = 
ConverterUtils.toContainerId(containerIdString);
-      return containerId.getApplicationAttemptId();
-    } catch (Exception e) {
-      LOG.log(Level.WARNING, "Unable to get the applicationAttempt ID from the 
environment, exception " +
-          e + " was thrown.");
-      return null;
-    }
-  }
-
-  /**
-   * Initializes the list of previous containers and determine whether or not 
this is an instance of restart
-   * based on information reported by the RM.
-   * @return true if previous containers is not empty.
-   */
-  private boolean isRestartByPreviousContainers() {
-    this.initializeListOfPreviousContainers();
-    return !this.previousContainers.isEmpty();
-  }
-
-  /**
-   * Initializes the list of previous containers as reported by YARN.
-   */
-  private synchronized void initializeListOfPreviousContainers() {
-    if (this.previousContainers == null) {
-      this.previousContainers = new 
HashSet<>(this.registration.getRegistration().getContainersFromPreviousAttempts());
-
-      // If it's still null, create an empty list to indicate that it's not a 
restart.
-      if (this.previousContainers == null) {
-        this.previousContainers = new HashSet<>();
-      }
-    }
-  }
-
-  @Override
-  public void onRestart() {
-    final Set<String> recoveredEvaluators = new HashSet<>();
-    final Set<String> failedEvaluators = new HashSet<>();
-
-    this.initializeListOfPreviousContainers();
-
-    if (this.previousContainers != null && !this.previousContainers.isEmpty()) 
{
-      LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", 
this.previousContainers.size());
-      final Set<String> expectedContainers = 
this.evaluatorPreserver.recoverEvaluators();
-
-      final int numExpectedContainers = expectedContainers.size();
-      final int numPreviousContainers = this.previousContainers.size();
-      if (numExpectedContainers > numPreviousContainers) {
-        // we expected more containers to be alive, some containers must have 
died during driver restart
-        LOG.log(Level.WARNING, "Expected {0} containers while only {1} are 
still alive",
-            new Object[]{numExpectedContainers, numPreviousContainers});
-        final Set<String> previousContainersIds = new HashSet<>();
-        for (final Container container : this.previousContainers) {
-          previousContainersIds.add(container.getId().toString());
-        }
-        for (final String expectedContainerId : expectedContainers) {
-          if (!previousContainersIds.contains(expectedContainerId)) {
-            
this.evaluatorPreserver.recordRemovedEvaluator(expectedContainerId);
-            LOG.log(Level.WARNING, "Expected container [{0}] not alive, must 
have failed during driver restart.",
-                expectedContainerId);
-            failedEvaluators.add(expectedContainerId);
-          }
-        }
-      }
-      if (numExpectedContainers < numPreviousContainers) {
-        // somehow we have more alive evaluators, this should not happen
-        throw new RuntimeException("Expected only [" + numExpectedContainers + 
"] containers " +
-            "but resource manager believe that [" + numPreviousContainers + "] 
are outstanding for driver.");
-      }
-
-      //  numExpectedContainers == numPreviousContainers
-      for (final Container container : this.previousContainers) {
-        LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
-        if (!expectedContainers.contains(container.getId().toString())) {
-          throw new RuntimeException("Not expecting container " + 
container.getId().toString());
-        }
-
-        recoveredEvaluators.add(container.getId().toString());
-      }
-    }
-
-    this.informAboutEvaluatorAlive(recoveredEvaluators);
-    this.informAboutEvaluatorFailures(failedEvaluators);
-  }
-
-  /**
-   * Informs the driver status manager about the number of evaluators to wait 
for to reinitiate contact
-   * with the driver.
-   * TODO [REEF-559]: Tighten previous evaluator ID checks by using entire set 
of evaluator IDs.
-   * @param evaluatorIds The set of evaluator IDs of evaluators expected to be 
alive.
-   */
-  private void informAboutEvaluatorAlive(final Set<String> evaluatorIds) {
-    // We will wait for these evaluators to contact us, so we do not need to 
record the entire container information.
-    this.driverStatusManager.setNumPreviousContainers(evaluatorIds.size());
-  }
-
-  /**
-   * Generate failure events for evaluators that cannot be recovered.
-   * @param evaluatorIds The set of evaluator IDs of evaluators that have 
failed on restart.
-   */
-  private void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
-    for (String evaluatorId : evaluatorIds) {
-      LOG.log(Level.WARNING, "Container [" + evaluatorId +
-          "] has failed during driver restart process, FailedEvaluatorHandler 
will be triggered, but " +
-          "no additional evaluator can be requested due to YARN-2433.");
-      // trigger a failed evaluator event
-      
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
-          .setIdentifier(evaluatorId)
-          .setState(ReefServiceProtos.State.FAILED)
-          .setExitCode(1)
-          .setDiagnostics("Container [" + evaluatorId + "] failed during 
driver restart process.")
-          .setIsFromPreviousDriver(true)
-          .build());
-    }
-  }
-
-  @Override
-  public void recordAllocatedEvaluator(final String id) {
-    this.evaluatorPreserver.recordAllocatedEvaluator(id);
-  }
-
-  @Override
-  public void recordRemovedEvaluator(final String id) {
-    this.evaluatorPreserver.recordRemovedEvaluator(id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
new file mode 100644
index 0000000..466fec2
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
+import org.apache.reef.driver.restart.EvaluatorRestartInfo;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The implementation of restart manager for YARN. Handles evaluator 
preservation as well
+ * as evaluator recovery on YARN.
+ */
+@DriverSide
+@RuntimeAuthor
+@Private
+@Unstable
+public final class YarnDriverRuntimeRestartManager implements 
DriverRuntimeRestartManager {
+
+  private static final Logger LOG = 
Logger.getLogger(YarnDriverRuntimeRestartManager.class.getName());
+
+  private final EvaluatorPreserver evaluatorPreserver;
+  private final ApplicationMasterRegistration registration;
+  private final REEFEventHandlers reefEventHandlers;
+  private Set<Container> previousContainers;
+
+  @Inject
+  private 
YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class)
+                                          final EvaluatorPreserver 
evaluatorPreserver,
+                                          final REEFEventHandlers 
reefEventHandlers,
+                                          final ApplicationMasterRegistration 
registration){
+    this.registration = registration;
+    this.evaluatorPreserver = evaluatorPreserver;
+    this.reefEventHandlers = reefEventHandlers;
+    this.previousContainers = null;
+  }
+
+  /**
+   * Determines whether the application master has been restarted based on the 
container ID environment
+   * variable provided by YARN. If that fails, determine whether the 
application master is a restart
+   * based on the number of previous containers reported by YARN.
+   * @return true if the application master is a restarted instance, false 
otherwise.
+   */
+  @Override
+  public boolean isRestart() {
+    final String containerIdString = getContainerIdString();
+
+    if (containerIdString == null) {
+      // container id should always be set in the env by the framework
+      LOG.log(Level.WARNING, "Container ID is null, determining restart based 
on previous containers.");
+      return this.isRestartByPreviousContainers();
+    }
+
+    final ApplicationAttemptId appAttemptID = 
getAppAttemptId(containerIdString);
+
+    if (appAttemptID == null) {
+      LOG.log(Level.WARNING, "applicationAttempt ID is null, determining 
restart based on previous containers.");
+      return this.isRestartByPreviousContainers();
+    }
+
+    LOG.log(Level.FINE, "Application attempt: " + appAttemptID.getAttemptId());
+
+    return appAttemptID.getAttemptId() > 1;
+  }
+
+  private static String getContainerIdString() {
+    try {
+      return 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key());
+    } catch (Exception e) {
+      LOG.log(Level.WARNING, "Unable to get the container ID from the 
environment, exception " +
+          e + " was thrown.");
+      return null;
+    }
+  }
+
+  private static ApplicationAttemptId getAppAttemptId(final String 
containerIdString) {
+    try {
+      final ContainerId containerId = 
ConverterUtils.toContainerId(containerIdString);
+      return containerId.getApplicationAttemptId();
+    } catch (Exception e) {
+      LOG.log(Level.WARNING, "Unable to get the applicationAttempt ID from the 
environment, exception " +
+          e + " was thrown.");
+      return null;
+    }
+  }
+
+  /**
+   * Initializes the list of previous containers and determine whether or not 
this is an instance of restart
+   * based on information reported by the RM.
+   * @return true if previous containers is not empty.
+   */
+  private boolean isRestartByPreviousContainers() {
+    this.initializeListOfPreviousContainers();
+    return !this.previousContainers.isEmpty();
+  }
+
+  /**
+   * Initializes the list of previous containers as reported by YARN.
+   */
+  private synchronized void initializeListOfPreviousContainers() {
+    if (this.previousContainers == null) {
+      this.previousContainers = new 
HashSet<>(this.registration.getRegistration().getContainersFromPreviousAttempts());
+
+      // If it's still null, create an empty list to indicate that it's not a 
restart.
+      if (this.previousContainers == null) {
+        this.previousContainers = new HashSet<>();
+      }
+    }
+  }
+
+  @Override
+  public void recordAllocatedEvaluator(final String id) {
+    this.evaluatorPreserver.recordAllocatedEvaluator(id);
+  }
+
+  @Override
+  public void recordRemovedEvaluator(final String id) {
+    this.evaluatorPreserver.recordRemovedEvaluator(id);
+  }
+
+  /**
+   * Used by tDriverRestartManager. Gets the list of previous containers from 
the resource manager,
+   * compares that list to the YarnDriverRuntimeRestartManager's own list 
based on the evalutor preserver,
+   * and determine which evaluators are alive and which have failed during 
restart.
+   * @return EvaluatorRestartInfo, the object encapsulating alive and failed 
evaluator IDs.
+   */
+  @Override
+  public EvaluatorRestartInfo getAliveAndFailedEvaluators() {
+    final Set<String> recoveredEvaluators = new HashSet<>();
+    final Set<String> failedEvaluators = new HashSet<>();
+
+    this.initializeListOfPreviousContainers();
+
+    if (this.previousContainers != null && !this.previousContainers.isEmpty()) 
{
+      LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", 
this.previousContainers.size());
+      final Set<String> expectedContainers = 
this.evaluatorPreserver.recoverEvaluators();
+
+      final int numExpectedContainers = expectedContainers.size();
+      final int numPreviousContainers = this.previousContainers.size();
+      if (numExpectedContainers > numPreviousContainers) {
+        // we expected more containers to be alive, some containers must have 
died during driver restart
+        LOG.log(Level.WARNING, "Expected {0} containers while only {1} are 
still alive",
+            new Object[]{numExpectedContainers, numPreviousContainers});
+        final Set<String> previousContainersIds = new HashSet<>();
+        for (final Container container : this.previousContainers) {
+          previousContainersIds.add(container.getId().toString());
+        }
+        for (final String expectedContainerId : expectedContainers) {
+          if (!previousContainersIds.contains(expectedContainerId)) {
+            
this.evaluatorPreserver.recordRemovedEvaluator(expectedContainerId);
+            LOG.log(Level.WARNING, "Expected container [{0}] not alive, must 
have failed during driver restart.",
+                expectedContainerId);
+            failedEvaluators.add(expectedContainerId);
+          }
+        }
+      }
+      if (numExpectedContainers < numPreviousContainers) {
+        // somehow we have more alive evaluators, this should not happen
+        throw new RuntimeException("Expected only [" + numExpectedContainers + 
"] containers " +
+            "but resource manager believe that [" + numPreviousContainers + "] 
are outstanding for driver.");
+      }
+
+      //  numExpectedContainers == numPreviousContainers
+      for (final Container container : this.previousContainers) {
+        LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
+        if (!expectedContainers.contains(container.getId().toString())) {
+          throw new RuntimeException("Not expecting container " + 
container.getId().toString());
+        }
+
+        recoveredEvaluators.add(container.getId().toString());
+      }
+    }
+
+    return new EvaluatorRestartInfo(recoveredEvaluators, failedEvaluators);
+  }
+
+  /**
+   * Calls the appropriate handler via REEFEventHandlers, which is a runtime 
specific implementation
+   * of the YARN runtime.
+   * @param evaluatorIds the set of evaluator IDs of failed evaluators during 
restart.
+   */
+  @Override
+  public void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
+    for (String evaluatorId : evaluatorIds) {
+      LOG.log(Level.WARNING, "Container [" + evaluatorId +
+          "] has failed during driver restart process, FailedEvaluatorHandler 
will be triggered, but " +
+          "no additional evaluator can be requested due to YARN-2433.");
+      // trigger a failed evaluator event
+      
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
+          .setIdentifier(evaluatorId)
+          .setState(ReefServiceProtos.State.FAILED)
+          .setExitCode(1)
+          .setDiagnostics("Container [" + evaluatorId + "] failed during 
driver restart process.")
+          .setIsFromPreviousDriver(true)
+          .build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
index acd0c48..01f6744 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
@@ -251,7 +251,11 @@ public final class RuntimeClock implements Clock {
           // waiting interrupted - return to loop
         }
       }
-      this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
+      if (this.stoppedOnException == null) {
+        this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
+      } else {
+        this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), 
this.stoppedOnException));
+      }
     } catch (final Exception e) {
       e.printStackTrace();
       this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));

Reply via email to