Repository: incubator-reef
Updated Branches:
  refs/heads/master 9c820fd55 -> 0181e2cca


[REEF-483] Make Evaluator logging configurable

This addressed the issue by
  * Creating `DriverRestartManager` interface and implementation
    `YarnDriverRestartManager`.
  * Binding ServiceHandlers for the DriverRestartManager to record Evaluators.
  * Creating the `EvaluatorPreserver` interface and implementing it in
    `DFSEvaluatorPreserver`.
  * Improved object usage of evaluator preservation.

JIRA:
  [REEF-483](https://issues.apache.org/jira/browse/REEF-483)

Pull Request:
  This closes #324


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0181e2cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0181e2cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0181e2cc

Branch: refs/heads/master
Commit: 0181e2ccaa697ef766455770d0bba4e7023cad31
Parents: 9c820fd
Author: Andrew Chung <[email protected]>
Authored: Thu Jul 30 10:42:20 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 3 20:05:50 2015 -0700

----------------------------------------------------------------------
 .../reef/client/DriverRestartConfiguration.java |  16 +-
 .../reef/client/DriverServiceConfiguration.java |   6 +
 .../FailDriverOnEvaluatorLogErrors.java         |  31 ++
 .../driver/restart/DriverRestartManager.java    |  64 +++++
 ...atorPreservingEvaluatorAllocatedHandler.java |  45 +++
 ...atorPreservingEvaluatorCompletedHandler.java |  46 +++
 ...aluatorPreservingEvaluatorFailedHandler.java |  46 +++
 .../reef/driver/restart/package-info.java       |  22 ++
 .../DriverRuntimeRestartConfiguration.java      |  13 +-
 .../common/driver/DriverStartHandler.java       |  64 +++--
 .../common/driver/EvaluatorPreserver.java       |  51 ++++
 .../yarn/driver/YarnContainerManager.java       | 281 +++----------------
 .../driver/YarnDriverRestartConfiguration.java  |  14 +-
 .../yarn/driver/YarnDriverRestartManager.java   | 164 +++++++++++
 .../parameters/YarnEvaluatorPreserver.java      |  34 +++
 .../restart/DFSEvaluatorLogAppendWriter.java    |  76 +++++
 .../restart/DFSEvaluatorLogOverwriteWriter.java | 104 +++++++
 .../driver/restart/DFSEvaluatorLogWriter.java   |  39 +++
 .../driver/restart/DFSEvaluatorPreserver.java   | 230 +++++++++++++++
 .../yarn/driver/restart/package-info.java       |  22 ++
 20 files changed, 1102 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
index 9973fad..225bd41 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -19,18 +19,17 @@
 package org.apache.reef.client;
 
 import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
-import org.apache.reef.driver.parameters.DriverRestartContextActiveHandlers;
-import org.apache.reef.driver.parameters.DriverRestartHandler;
-import org.apache.reef.driver.parameters.DriverRestartTaskRunningHandlers;
+import org.apache.reef.driver.parameters.*;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.runtime.common.DriverRestartCompleted;
 import org.apache.reef.tang.formats.ConfigurationModule;
 import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
 import org.apache.reef.tang.formats.OptionalImpl;
+import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.time.event.StartTime;
 
@@ -41,6 +40,7 @@ import org.apache.reef.wake.time.event.StartTime;
 @Public
 @ClientSide
 @Provided
+@Unstable
 public final class DriverRestartConfiguration extends 
ConfigurationModuleBuilder {
   /**
    * This event is fired in place of the ON_DRIVER_STARTED when the Driver is 
in fact restarted after failure.
@@ -63,7 +63,15 @@ public final class DriverRestartConfiguration extends 
ConfigurationModuleBuilder
   public static final OptionalImpl<EventHandler<DriverRestartCompleted>> 
ON_DRIVER_RESTART_COMPLETED =
       new OptionalImpl<>();
 
+  /**
+   * Parameter to determine whether the driver should fail or continue if 
there are evaluator
+   * preservation log failures. Defaults to false.
+   */
+  public static final OptionalParameter<Boolean> 
FAIL_DRIVER_ON_EVALUATOR_LOG_ERROR =
+      new OptionalParameter<>();
+
   public static final ConfigurationModule CONF = new 
DriverRestartConfiguration()
+      .bindNamedParameter(FailDriverOnEvaluatorLogErrors.class, 
FAIL_DRIVER_ON_EVALUATOR_LOG_ERROR)
       .bindSetEntry(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
       .bindSetEntry(DriverRestartTaskRunningHandlers.class, 
ON_DRIVER_RESTART_TASK_RUNNING)
       .bindSetEntry(DriverRestartContextActiveHandlers.class, 
ON_DRIVER_RESTART_CONTEXT_ACTIVE)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
index d320f2f..4fe0452 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
@@ -75,6 +75,11 @@ public final class DriverServiceConfiguration extends 
ConfigurationModuleBuilder
   public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED 
= new RequiredImpl<>();
 
   /**
+   * The event handler invoked right after the driver restarts.
+   */
+  public static final OptionalImpl<EventHandler<StartTime>> 
ON_DRIVER_RESTARTED = new OptionalImpl<>();
+
+  /**
    * The event handler invoked right before the driver shuts down. Defaults to 
ignore.
    */
   public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = 
new OptionalImpl<>();
@@ -161,6 +166,7 @@ public final class DriverServiceConfiguration extends 
ConfigurationModuleBuilder
           // Start and stop events are the same handlers for applications and 
services.
       .bindSetEntry(Clock.StartHandler.class, ON_DRIVER_STARTED)
       .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
+      .bindSetEntry(ServiceDriverRestartedHandlers.class, ON_DRIVER_RESTARTED)
 
           // Evaluator handlers
       .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, 
ON_EVALUATOR_ALLOCATED)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/FailDriverOnEvaluatorLogErrors.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/FailDriverOnEvaluatorLogErrors.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/FailDriverOnEvaluatorLogErrors.java
new file mode 100644
index 0000000..08c5a29
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/FailDriverOnEvaluatorLogErrors.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether to fail driver on Evaluator preservation log errors.
+ */
+@NamedParameter(doc = "Handler for FailedContext", default_value = "false")
+public final class FailDriverOnEvaluatorLogErrors implements Name<Boolean> {
+  private FailDriverOnEvaluatorLogErrors() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
new file mode 100644
index 0000000..c60740f
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+/**
+ * Classes implementing this interface are in charge of recording evaluator
+ * changes as they are allocated as well as recovering Evaluators and
+ * discovering which evaluators are lost on the event of a driver restart.
+ */
+@DriverSide
+@Private
+@RuntimeAuthor
+@Unstable
+public interface DriverRestartManager {
+
+  /**
+   * Determines whether or not the driver has been restarted.
+   */
+  boolean isRestart();
+
+  /**
+   * This function has a few jobs crucial jobs to enable restart:
+   * 1. Recover the list of evaluators that are reported to be alive by the 
Resource Manager.
+   * 2. Make necessary operations to inform relevant runtime components about 
evaluators that are alive
+   * with the set of evaluator IDs recovered in step 1.
+   * 3. Make necessary operations to inform relevant runtime components about 
evaluators that have failed
+   * during the driver restart period.
+   */
+  void onRestart();
+
+  /**
+   * Records the evaluators when it is allocated.
+   * @param id The evaluator ID of the allocated evaluator.
+   */
+  void recordAllocatedEvaluator(final String id);
+
+
+  /**
+   * Records a removed evaluator into the evaluator log.
+   * @param id The evaluator ID of the removed evaluator.
+   */
+  void recordRemovedEvaluator(final String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
new file mode 100644
index 0000000..ca9321f
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Records allocated evaluators for recovery on driver restart by using a 
DriverRestartManager.
+ */
+public final class EvaluatorPreservingEvaluatorAllocatedHandler implements 
EventHandler<AllocatedEvaluator> {
+  private final DriverRestartManager driverRestartManager;
+
+  @Inject
+  private EvaluatorPreservingEvaluatorAllocatedHandler(final 
DriverRestartManager driverRestartManager) {
+    this.driverRestartManager = driverRestartManager;
+  }
+
+  /**
+   * Records the allocatedEvaluator ID with the DriverRestartManager.
+   * @param value the allocated evaluator event.
+   */
+  @Override
+  public void onNext(final AllocatedEvaluator value) {
+    this.driverRestartManager.recordAllocatedEvaluator(value.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorCompletedHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorCompletedHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorCompletedHandler.java
new file mode 100644
index 0000000..96536c3
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorCompletedHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Removes an evaluator from the evaluators to restart after it has completed.
+ */
+public final class EvaluatorPreservingEvaluatorCompletedHandler implements 
EventHandler<CompletedEvaluator> {
+  private final DriverRestartManager driverRestartManager;
+
+  @Inject
+  private EvaluatorPreservingEvaluatorCompletedHandler(final 
DriverRestartManager driverRestartManager) {
+    this.driverRestartManager = driverRestartManager;
+  }
+
+  /**
+   * Removes the completed evaluator from the list of evaluators to recover
+   * once it is completed.
+   * @param value The completed evaluator event.
+   */
+  @Override
+  public void onNext(final CompletedEvaluator value) {
+    this.driverRestartManager.recordRemovedEvaluator(value.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorFailedHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorFailedHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorFailedHandler.java
new file mode 100644
index 0000000..0c55759
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorFailedHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Removes an evaluator from the evaluators to restart after it has failed.
+ */
+public final class EvaluatorPreservingEvaluatorFailedHandler implements 
EventHandler<FailedEvaluator> {
+  private final DriverRestartManager driverRestartManager;
+
+  @Inject
+  private EvaluatorPreservingEvaluatorFailedHandler(final DriverRestartManager 
driverRestartManager) {
+    this.driverRestartManager = driverRestartManager;
+  }
+
+  /**
+   * Removes the completed evaluator from the list of evaluators to recover
+   * once it is failed.
+   * @param value The failed evaluator event.
+   */
+  @Override
+  public void onNext(final FailedEvaluator value) {
+    this.driverRestartManager.recordRemovedEvaluator(value.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/package-info.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/package-info.java
new file mode 100644
index 0000000..bf68f7c
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This package provides restart event service handlers.
+ */
+package org.apache.reef.driver.restart;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index 1f0a527..70d0c9b 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -20,6 +20,12 @@ package org.apache.reef.runtime.common.driver;
 
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.ServiceEvaluatorAllocatedHandlers;
+import org.apache.reef.driver.parameters.ServiceEvaluatorCompletedHandlers;
+import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
+import 
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorAllocatedHandler;
+import 
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorCompletedHandler;
+import 
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorFailedHandler;
 import org.apache.reef.tang.formats.*;
 
 /**
@@ -33,6 +39,9 @@ public final class DriverRuntimeRestartConfiguration extends 
ConfigurationModule
   private DriverRuntimeRestartConfiguration() {
   }
 
-  // TODO: bind service handlers in REEF-483
-  public static final ConfigurationModule CONF = new 
DriverRuntimeRestartConfiguration().build();
+  public static final ConfigurationModule CONF = new 
DriverRuntimeRestartConfiguration()
+      .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, 
EvaluatorPreservingEvaluatorAllocatedHandler.class)
+      .bindSetEntry(ServiceEvaluatorFailedHandlers.class, 
EvaluatorPreservingEvaluatorFailedHandler.class)
+      .bindSetEntry(ServiceEvaluatorCompletedHandlers.class, 
EvaluatorPreservingEvaluatorCompletedHandler.class)
+      .build();
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 19c2e7a..bfd52a3 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -19,6 +19,8 @@
 package org.apache.reef.runtime.common.driver;
 
 import org.apache.reef.driver.parameters.DriverRestartHandler;
+import org.apache.reef.driver.parameters.ServiceDriverRestartedHandlers;
+import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
@@ -31,38 +33,54 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
- * This is bound to the start event of the clock and dispatches it to the 
approriate application code.
+ * This is bound to the start event of the clock and dispatches it to the 
appropriate application code.
  */
 public final class DriverStartHandler implements EventHandler<StartTime> {
   private static final Logger LOG = 
Logger.getLogger(DriverStartHandler.class.getName());
 
   private final Set<EventHandler<StartTime>> startHandlers;
   private final Optional<Set<EventHandler<StartTime>>> restartHandlers;
-  private final DriverStatusManager driverStatusManager;
+  private final Optional<Set<EventHandler<StartTime>>> serviceRestartHandlers;
+  private final Optional<DriverRestartManager> driverRestartManager;
 
   @Inject
   
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
                      final Set<EventHandler<StartTime>> startHandler,
-                     @Parameter(DriverRestartHandler.class) final 
Set<EventHandler<StartTime>> restartHandlers,
+                     @Parameter(DriverRestartHandler.class)
+                     final Set<EventHandler<StartTime>> restartHandlers,
+                     @Parameter(ServiceDriverRestartedHandlers.class)
+                     final Set<EventHandler<StartTime>> serviceRestartHandlers,
+                     final DriverRestartManager driverRestartManager,
                      final DriverStatusManager driverStatusManager) {
-    this.startHandlers = startHandler;
-    this.restartHandlers = Optional.of(restartHandlers);
-    this.driverStatusManager = driverStatusManager;
-    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler 
[{0}] and RestartHandler [{1}]",
-        new String[]{this.startHandlers.toString(), 
this.restartHandlers.toString()});
+    this(startHandler, Optional.of(restartHandlers), 
Optional.of(serviceRestartHandlers),
+        Optional.of(driverRestartManager), driverStatusManager);
+    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers 
[{0}], RestartHandlers [{1}]," +
+            "and ServiceRestartHandlers [{2}], with a restart manager.",
+        new String[] {this.startHandlers.toString(), 
this.restartHandlers.toString(),
+            this.serviceRestartHandlers.toString()});
   }
 
   @Inject
   
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
-                     final Set<EventHandler<StartTime>> startHandler,
+                     final Set<EventHandler<StartTime>> startHandlers,
                      final DriverStatusManager driverStatusManager) {
-    this.startHandlers = startHandler;
-    this.restartHandlers = Optional.empty();
-    this.driverStatusManager = driverStatusManager;
-    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler 
[{0}] and no RestartHandler",
+    this(startHandlers, Optional.<Set<EventHandler<StartTime>>>empty(),
+        Optional.<Set<EventHandler<StartTime>>>empty(), 
Optional.<DriverRestartManager>empty(), driverStatusManager);
+    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers 
[{0}] and no restart.",
         this.startHandlers.toString());
   }
 
+  private DriverStartHandler(final Set<EventHandler<StartTime>> startHandler,
+                             final Optional<Set<EventHandler<StartTime>>> 
restartHandlers,
+                             final Optional<Set<EventHandler<StartTime>>> 
serviceRestartHandlers,
+                             final Optional<DriverRestartManager> 
driverRestartManager,
+                             final DriverStatusManager driverStatusManager) {
+    this.startHandlers = startHandler;
+    this.restartHandlers = restartHandlers;
+    this.serviceRestartHandlers = serviceRestartHandlers;
+    this.driverRestartManager = driverRestartManager;
+  }
+
   @Override
   public void onNext(final StartTime startTime) {
     if (isRestart()) {
@@ -73,10 +91,18 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
   }
 
   private void onRestart(final StartTime startTime) {
-    if (restartHandlers.isPresent()) {
-      for (EventHandler<StartTime> restartHandler : 
this.restartHandlers.get()) {
+    if (this.driverRestartManager.isPresent() && 
this.restartHandlers.isPresent()) {
+      for (EventHandler<StartTime> serviceRestartHandler : 
this.serviceRestartHandlers.get()) {
+        serviceRestartHandler.onNext(startTime);
+      }
+
+      for (EventHandler<StartTime> restartHandler : 
this.restartHandlers.get()){
         restartHandler.onNext(startTime);
       }
+
+      // This can only be called after calling client restart handlers because 
REEF.NET
+      // JobDriver requires making this call to set up the InterOp handlers.
+      this.driverRestartManager.get().onRestart();
     } else {
       throw new DriverFatalRuntimeException("Driver restart happened, but no 
ON_DRIVER_RESTART handler is bound.");
     }
@@ -89,9 +115,13 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
   }
 
   /**
-   * @return true, if the Driver is in fact being restarted.
+   * @return true, if the configurations enable restart and the Driver is in 
fact being restarted.
    */
   private boolean isRestart() {
-    return this.driverStatusManager.getNumPreviousContainers() > 0;
+    if (this.driverRestartManager.isPresent()) {
+      return this.driverRestartManager.get().isRestart();
+    }
+
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorPreserver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorPreserver.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorPreserver.java
new file mode 100644
index 0000000..5c51d3c
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorPreserver.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+import java.util.Set;
+
+/**
+ * A interface to preserve evaluators across driver restarts.
+ */
+@DriverSide
+@Private
+@RuntimeAuthor
+@Unstable
+public interface EvaluatorPreserver {
+  /**
+   * Called on driver restart when evaluators are to be recovered.
+   */
+  Set<String> recoverEvaluators();
+
+  /**
+   * Called when an evaluator is to be preserved.
+   */
+  void recordAllocatedEvaluator(final String id);
+
+  /**
+   * Called when an evaluator is to be removed.
+   * @param id
+   */
+  void recordRemovedEvaluator(final String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index 0e7d0e7..af460d8 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -20,10 +20,6 @@ package org.apache.reef.runtime.yarn.driver;
 
 import com.google.protobuf.ByteString;
 import org.apache.commons.collections.ListUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -36,13 +32,11 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
-import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
-import org.apache.reef.runtime.yarn.util.YarnTypes;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
 import org.apache.reef.wake.remote.Encoder;
@@ -64,10 +58,6 @@ final class YarnContainerManager
 
   private static final String RUNTIME_NAME = "YARN";
 
-  private static final String ADD_FLAG = "+";
-
-  private static final String REMOVE_FLAG = "-";
-
   private final YarnClient yarnClient = YarnClient.createYarnClient();
 
   private final Queue<AMRMClient.ContainerRequest> requestsBeforeSentToRM = 
new ConcurrentLinkedQueue<>();
@@ -133,7 +123,7 @@ final class YarnContainerManager
         new Object[]{id, containers.size(), 
this.containerRequestCounter.get()});
 
     for (final Container container : containers) {
-      handleNewContainer(container, false);
+      handleNewContainer(container);
     }
 
     LOG.log(Level.FINE, "TIME: Processed Containers {0}", id);
@@ -222,7 +212,6 @@ final class YarnContainerManager
     LOG.log(Level.FINE, "Release container: {0}", containerId);
     final Container container = this.containers.removeAndGet(containerId);
     this.resourceManager.releaseAssignedContainer(container.getId());
-    logContainerRemoval(container.getId().toString());
     updateRuntimeStatus();
   }
 
@@ -252,16 +241,6 @@ final class YarnContainerManager
       LOG.log(Level.WARNING, "Unable to register application master.", e);
       onRuntimeError(e);
     }
-
-    // TODO: this is currently being developed on a hacked 2.4.0 bits, should 
be 2.4.1
-    final String minVersionToGetPreviousContainer = "2.4.0";
-
-    // when supported, obtain the list of the containers previously allocated, 
and write info to driver folder
-    if (YarnTypes.isAtOrAfterVersion(minVersionToGetPreviousContainer)) {
-      LOG.log(Level.FINEST, "Hadoop version is {0} or after with support to 
retain previous containers, " +
-          "processing previous containers.", minVersionToGetPreviousContainer);
-      processPreviousContainers();
-    }
   }
 
   void onStop(final Throwable exception) {
@@ -320,48 +299,6 @@ final class YarnContainerManager
     this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
   }
 
-  private void processPreviousContainers() {
-    final List<Container> previousContainers = 
this.registration.getRegistration().getContainersFromPreviousAttempts();
-    if (previousContainers != null && !previousContainers.isEmpty()) {
-      LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", 
previousContainers.size());
-      
this.driverStatusManager.setNumPreviousContainers(previousContainers.size());
-      final Set<String> expectedContainers = 
getExpectedContainersFromLogReplay();
-      final int numExpectedContainers = expectedContainers.size();
-      final int numPreviousContainers = previousContainers.size();
-      if (numExpectedContainers > numPreviousContainers) {
-        // we expected more containers to be alive, some containers must have 
died during driver restart
-        LOG.log(Level.WARNING, "Expected {0} containers while only {1} are 
still alive",
-            new Object[]{numExpectedContainers, numPreviousContainers});
-        final Set<String> previousContainersIds = new HashSet<>();
-        for (final Container container : previousContainers) {
-          previousContainersIds.add(container.getId().toString());
-        }
-        for (final String expectedContainerId : expectedContainers) {
-          if (!previousContainersIds.contains(expectedContainerId)) {
-            logContainerRemoval(expectedContainerId);
-            LOG.log(Level.WARNING, "Expected container [{0}] not alive, must 
have failed during driver restart.",
-                expectedContainerId);
-            informAboutConatinerFailureDuringRestart(expectedContainerId);
-          }
-        }
-      }
-      if (numExpectedContainers < numPreviousContainers) {
-        // somehow we have more alive evaluators, this should not happen
-        throw new RuntimeException("Expected only [" + numExpectedContainers + 
"] containers " +
-            "but resource manager believe that [" + numPreviousContainers + "] 
are outstanding for driver.");
-      }
-
-      //  numExpectedContainers == numPreviousContainers
-      for (final Container container : previousContainers) {
-        LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
-        if (!expectedContainers.contains(container.getId().toString())) {
-          throw new DriverFatalRuntimeException("Not expecting container " + 
container.getId().toString());
-        }
-        handleNewContainer(container, true);
-      }
-    }
-  }
-
   /**
    * Handles container status reports. Calls come from YARN.
    *
@@ -394,7 +331,6 @@ final class YarnContainerManager
         status.setExitCode(value.getExitStatus());
           // remove the completed container (can be either done/killed/failed) 
from book keeping
         this.containers.removeAndGet(containerId);
-        logContainerRemoval(containerId);
         break;
       default:
         LOG.info("Container running");
@@ -426,53 +362,48 @@ final class YarnContainerManager
    *
    * @param container newly allocated
    */
-  private void handleNewContainer(final Container container, final boolean 
isRecoveredContainer) {
+  private void handleNewContainer(final Container container) {
 
     LOG.log(Level.FINE, "allocated container: id[ {0} ]", container.getId());
-    // recovered container is not new allocation, it is just checking back 
from previous driver failover
-    if (!isRecoveredContainer) {
-      synchronized (this) {
-        if (matchContainerWithPendingRequest(container)) {
-          final AMRMClient.ContainerRequest matchedRequest = 
this.requestsAfterSentToRM.peek();
-          this.containerRequestCounter.decrement();
-          this.containers.add(container);
-
-          LOG.log(Level.FINEST, "{0} matched with {1}", new 
Object[]{container.toString(), matchedRequest.toString()});
-
-          // Due to the bug YARN-314 and the workings of AMRMCClientAsync, 
when x-priority m-capacity zero-container
-          // request and x-priority n-capacity nonzero-container request are 
sent together, where m > n, RM ignores
-          // the latter.
-          // Therefore it is necessary avoid sending zero-container request, 
even it means getting extra containers.
-          // It is okay to send nonzero m-capacity and n-capacity request 
together since bigger containers
-          // can be matched.
-          // TODO: revisit this when implementing locality-strictness (i.e. a 
specific rack request can be ignored)
-          if (this.requestsAfterSentToRM.size() > 1) {
-            try {
-              this.resourceManager.removeContainerRequest(matchedRequest);
-            } catch (final Exception e) {
-              LOG.log(Level.WARNING, "Nothing to remove from Async AMRM 
client's queue, " +
-                  "removal attempt failed with exception", e);
-            }
+    synchronized (this) {
+      if (matchContainerWithPendingRequest(container)) {
+        final AMRMClient.ContainerRequest matchedRequest = 
this.requestsAfterSentToRM.peek();
+        this.containerRequestCounter.decrement();
+        this.containers.add(container);
+
+        LOG.log(Level.FINEST, "{0} matched with {1}", new 
Object[]{container.toString(), matchedRequest.toString()});
+
+        // Due to the bug YARN-314 and the workings of AMRMCClientAsync, when 
x-priority m-capacity zero-container
+        // request and x-priority n-capacity nonzero-container request are 
sent together, where m > n, RM ignores
+        // the latter.
+        // Therefore it is necessary avoid sending zero-container request, 
even it means getting extra containers.
+        // It is okay to send nonzero m-capacity and n-capacity request 
together since bigger containers
+        // can be matched.
+        // TODO: revisit this when implementing locality-strictness (i.e. a 
specific rack request can be ignored)
+        if (this.requestsAfterSentToRM.size() > 1) {
+          try {
+            this.resourceManager.removeContainerRequest(matchedRequest);
+          } catch (final Exception e) {
+            LOG.log(Level.WARNING, "Nothing to remove from Async AMRM client's 
queue, " +
+                "removal attempt failed with exception", e);
           }
-
-          this.requestsAfterSentToRM.remove();
-          doHomogeneousRequests();
-
-          LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core 
number = {1}",
-              new Object[]{container.getResource().getMemory(), 
container.getResource().getVirtualCores()});
-          
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
-              .setIdentifier(container.getId().toString())
-              .setNodeId(container.getNodeId().toString())
-              .setResourceMemory(container.getResource().getMemory())
-              .setVirtualCores(container.getResource().getVirtualCores())
-              .build());
-          // we only add this to Container log after the Container has been 
registered as an REEF Evaluator.
-          logContainerAddition(container.getId().toString());
-          this.updateRuntimeStatus();
-        } else {
-          LOG.log(Level.WARNING, "Got an extra container {0} that doesn't 
match, releasing...", container.getId());
-          this.resourceManager.releaseAssignedContainer(container.getId());
         }
+
+        this.requestsAfterSentToRM.remove();
+        doHomogeneousRequests();
+
+        LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number 
= {1}",
+            new Object[]{container.getResource().getMemory(), 
container.getResource().getVirtualCores()});
+        
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
+            .setIdentifier(container.getId().toString())
+            .setNodeId(container.getNodeId().toString())
+            .setResourceMemory(container.getResource().getMemory())
+            .setVirtualCores(container.getResource().getVirtualCores())
+            .build());
+        this.updateRuntimeStatus();
+      } else {
+        LOG.log(Level.WARNING, "Got an extra container {0} that doesn't match, 
releasing...", container.getId());
+        this.resourceManager.releaseAssignedContainer(container.getId());
       }
     }
   }
@@ -564,138 +495,4 @@ final class YarnContainerManager
 
     this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build());
   }
-
-  private Set<String> getExpectedContainersFromLogReplay() {
-    final org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
-    config.setBoolean("dfs.support.append", true);
-    config.setBoolean("dfs.support.broken.append", true);
-    final Set<String> expectedContainers = new HashSet<>();
-    try {
-      final FileSystem fs = FileSystem.get(config);
-      final Path path = new Path(getChangeLogLocation());
-      if (!fs.exists(path)) {
-        // empty set
-        return expectedContainers;
-      } else {
-        final BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
-        String line = br.readLine();
-        while (line != null) {
-          if (line.startsWith(ADD_FLAG)) {
-            final String containerId = line.substring(ADD_FLAG.length());
-            if (expectedContainers.contains(containerId)) {
-              throw new DriverFatalRuntimeException("Duplicated add container 
record found in the " +
-                  "change log for container " + containerId);
-            }
-            expectedContainers.add(containerId);
-          } else if (line.startsWith(REMOVE_FLAG)) {
-            final String containerId = line.substring(REMOVE_FLAG.length());
-            if (!expectedContainers.contains(containerId)) {
-              throw new DriverFatalRuntimeException("Change log includes 
record that try to " +
-                  "remove non-exist or duplicate remove record for container + 
" + containerId);
-            }
-            expectedContainers.remove(containerId);
-          }
-          line = br.readLine();
-        }
-        br.close();
-      }
-    } catch (final IOException e) {
-      throw new DriverFatalRuntimeException("Cannot read from log file", e);
-    }
-    return expectedContainers;
-  }
-
-  private void informAboutConatinerFailureDuringRestart(final String 
containerId) {
-    LOG.log(Level.WARNING, "Container [" + containerId +
-        "] has failed during driver restart process, FailedEvaluatorHandler 
will be triggered, but " +
-        "no additional evaluator can be requested due to YARN-2433.");
-    // trigger a failed evaluator event
-    
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
-        .setIdentifier(containerId)
-        .setState(ReefServiceProtos.State.FAILED)
-        .setExitCode(1)
-        .setDiagnostics("Container [" + containerId + "] failed during driver 
restart process.")
-        .setIsFromPreviousDriver(true)
-        .build());
-  }
-
-  private synchronized void writeToEvaluatorLog(final String entry) throws 
IOException {
-    final org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
-    config.setBoolean("dfs.support.append", true);
-    config.setBoolean("dfs.support.broken.append", true);
-    final FileSystem fs = getFileSystemInstance();
-    final Path path = new Path(getChangeLogLocation());
-    final boolean appendToLog = fs.exists(path);
-
-    try (
-        final BufferedWriter bw = appendToLog ?
-            new BufferedWriter(new OutputStreamWriter(fs.append(path))) :
-            new BufferedWriter(new OutputStreamWriter(fs.create(path)));
-    ) {
-      bw.write(entry);
-    } catch (final IOException e) {
-      if (appendToLog) {
-        LOG.log(Level.FINE, "Unable to add an entry to the Evaluator log. 
Attempting append by delete and recreate", e);
-        appendByDeleteAndCreate(fs, path, entry);
-      }
-    }
-  }
-
-  private FileSystem getFileSystemInstance() throws IOException {
-    final org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
-    config.setBoolean("dfs.support.append", true);
-    config.setBoolean("dfs.support.broken.append", true);
-    return FileSystem.get(config);
-  }
-
-  /**
-   * For certain HDFS implementation, the append operation may not be 
supported (e.g., Azure blob - wasb)
-   * in this case, we will emulate the append operation by reading the 
content, appending entry at the end,
-   * then recreating the file with appended content.
-   *
-   * @throws java.io.IOException when the file can't be written.
-   */
-
-  private void appendByDeleteAndCreate(final FileSystem fs, final Path path, 
final String appendEntry)
-      throws IOException {
-    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
-    try (final InputStream inputStream = fs.open(path)) {
-      IOUtils.copyBytes(inputStream, outputStream, 4096, true);
-    }
-
-    final String newContent = outputStream.toString() + appendEntry;
-    fs.delete(path, true);
-
-    try (final FSDataOutputStream newOutput = fs.create(path);
-         final InputStream newInput = new 
ByteArrayInputStream(newContent.getBytes())) {
-      IOUtils.copyBytes(newInput, newOutput, 4096, true);
-    }
-
-  }
-
-  private String getChangeLogLocation() {
-    return "/ReefApplications/" + EvaluatorManager.getJobIdentifier() + 
"/evaluatorsChangesLog";
-  }
-
-  private void logContainerAddition(final String containerId) {
-    final String entry = ADD_FLAG + containerId + System.lineSeparator();
-    logContainerChange(entry);
-  }
-
-  private void logContainerRemoval(final String containerId) {
-    final String entry = REMOVE_FLAG + containerId + System.lineSeparator();
-    logContainerChange(entry);
-  }
-
-  private void logContainerChange(final String entry) {
-    try {
-      writeToEvaluatorLog(entry);
-    } catch (final IOException e) {
-      final String errorMsg = "Unable to log the change of container [" + 
entry +
-          "] to the container log. Driver restart won't work properly.";
-      LOG.log(Level.WARNING, errorMsg, e);
-      throw new DriverFatalRuntimeException(errorMsg);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
index 7c11fec..5f10bf8 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
@@ -19,11 +19,16 @@
 package org.apache.reef.runtime.yarn.driver;
 
 import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.runtime.common.driver.DriverRuntimeRestartConfiguration;
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
 import org.apache.reef.tang.formats.ConfigurationModule;
 import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalImpl;
 
 /**
  * Use this ConfigurationModule to configure YARN-specific Restart options for 
the driver.
@@ -32,12 +37,19 @@ import 
org.apache.reef.tang.formats.ConfigurationModuleBuilder;
 @ClientSide
 @Public
 @Provided
+@Unstable
 public final class YarnDriverRestartConfiguration extends 
ConfigurationModuleBuilder {
   /**
+   * The Evaluator Preserver implementation used for YARN. Defaults to 
DFSEvalutorPreserver.
+   */
+  public static final OptionalImpl<EvaluatorPreserver> EVALUATOR_PRESERVER = 
new OptionalImpl<>();
+
+  /**
    * This event is fired in place of the ON_DRIVER_STARTED when the Driver is 
in fact restarted after failure.
    */
-    // TODO: Bind runtime-specific restart logic for REEF-483.
   public static final ConfigurationModule CONF = new 
YarnDriverRestartConfiguration()
+      .bindNamedParameter(YarnEvaluatorPreserver.class, EVALUATOR_PRESERVER)
+      .bindImplementation(DriverRestartManager.class, 
YarnDriverRestartManager.class)
       .merge(DriverRuntimeRestartConfiguration.CONF)
       .build();
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
new file mode 100644
index 0000000..d6be373
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The implementation of restart manager for YARN. Handles evaluator 
preservation as well
+ * as evaluator recovery on YARN.
+ */
+@DriverSide
+@RuntimeAuthor
+@Private
+@Unstable
+public final class YarnDriverRestartManager implements DriverRestartManager {
+
+  private static final Logger LOG = 
Logger.getLogger(YarnDriverRestartManager.class.getName());
+
+  private final EvaluatorPreserver evaluatorPreserver;
+  private final ApplicationMasterRegistration registration;
+  private final DriverStatusManager driverStatusManager;
+  private final REEFEventHandlers reefEventHandlers;
+  private List<Container> previousContainers;
+
+  @Inject
+  private YarnDriverRestartManager(@Parameter(YarnEvaluatorPreserver.class)
+                           final EvaluatorPreserver evaluatorPreserver,
+                           final REEFEventHandlers reefEventHandlers,
+                           final ApplicationMasterRegistration registration,
+                           final DriverStatusManager driverStatusManager){
+    this.registration = registration;
+    this.evaluatorPreserver = evaluatorPreserver;
+    this.driverStatusManager = driverStatusManager;
+    this.reefEventHandlers = reefEventHandlers;
+    this.previousContainers = null;
+  }
+
+  @Override
+  public boolean isRestart() {
+    // TODO [REEF-508]: Determine restart based on environment variable as 
provided by YARN.
+    if (this.previousContainers == null) {
+      this.previousContainers = 
this.registration.getRegistration().getContainersFromPreviousAttempts();
+
+      // If it's still null, create an empty list to indicate that it's not a 
restart.
+      if (this.previousContainers == null) {
+        this.previousContainers = new ArrayList<>();
+      }
+    }
+
+    return !this.previousContainers.isEmpty();
+  }
+
+  @Override
+  public void onRestart() {
+    final Set<String> recoveredEvaluators = new HashSet<>();
+    final Set<String> failedEvaluators = new HashSet<>();
+
+    if (this.previousContainers != null && !this.previousContainers.isEmpty()) 
{
+      LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", 
this.previousContainers.size());
+      final Set<String> expectedContainers = 
this.evaluatorPreserver.recoverEvaluators();
+
+      final int numExpectedContainers = expectedContainers.size();
+      final int numPreviousContainers = this.previousContainers.size();
+      if (numExpectedContainers > numPreviousContainers) {
+        // we expected more containers to be alive, some containers must have 
died during driver restart
+        LOG.log(Level.WARNING, "Expected {0} containers while only {1} are 
still alive",
+            new Object[]{numExpectedContainers, numPreviousContainers});
+        final Set<String> previousContainersIds = new HashSet<>();
+        for (final Container container : this.previousContainers) {
+          previousContainersIds.add(container.getId().toString());
+        }
+        for (final String expectedContainerId : expectedContainers) {
+          if (!previousContainersIds.contains(expectedContainerId)) {
+            
this.evaluatorPreserver.recordRemovedEvaluator(expectedContainerId);
+            LOG.log(Level.WARNING, "Expected container [{0}] not alive, must 
have failed during driver restart.",
+                expectedContainerId);
+            failedEvaluators.add(expectedContainerId);
+          }
+        }
+      }
+      if (numExpectedContainers < numPreviousContainers) {
+        // somehow we have more alive evaluators, this should not happen
+        throw new RuntimeException("Expected only [" + numExpectedContainers + 
"] containers " +
+            "but resource manager believe that [" + numPreviousContainers + "] 
are outstanding for driver.");
+      }
+
+      //  numExpectedContainers == numPreviousContainers
+      for (final Container container : this.previousContainers) {
+        LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
+        if (!expectedContainers.contains(container.getId().toString())) {
+          throw new RuntimeException("Not expecting container " + 
container.getId().toString());
+        }
+
+        recoveredEvaluators.add(container.getId().toString());
+      }
+    }
+
+    this.informAboutEvaluatorAlive(recoveredEvaluators);
+    this.informAboutEvaluatorFailures(failedEvaluators);
+  }
+
+  private void informAboutEvaluatorAlive(final Set<String> evaluatorIds) {
+    // We will wait for these evaluators to contact us, so we do not need to 
record the entire container information.
+    this.driverStatusManager.setNumPreviousContainers(evaluatorIds.size());
+  }
+
+  private void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
+    for (String evaluatorId : evaluatorIds) {
+      LOG.log(Level.WARNING, "Container [" + evaluatorId +
+          "] has failed during driver restart process, FailedEvaluatorHandler 
will be triggered, but " +
+          "no additional evaluator can be requested due to YARN-2433.");
+      // trigger a failed evaluator event
+      
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
+          .setIdentifier(evaluatorId)
+          .setState(ReefServiceProtos.State.FAILED)
+          .setExitCode(1)
+          .setDiagnostics("Container [" + evaluatorId + "] failed during 
driver restart process.")
+          .setIsFromPreviousDriver(true)
+          .build());
+    }
+  }
+
+  @Override
+  public void recordAllocatedEvaluator(final String id) {
+    this.evaluatorPreserver.recordAllocatedEvaluator(id);
+  }
+
+  @Override
+  public void recordRemovedEvaluator(final String id) {
+    this.evaluatorPreserver.recordRemovedEvaluator(id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnEvaluatorPreserver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnEvaluatorPreserver.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnEvaluatorPreserver.java
new file mode 100644
index 0000000..9b07f0b
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnEvaluatorPreserver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.parameters;
+
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.yarn.driver.restart.DFSEvaluatorPreserver;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Evaluator Preserver to use on YARN, defaults to DFS.
+ */
+@NamedParameter(doc = "The Evaluator Preserver to use on YARN, defaults to 
DFS.",
+    default_class = DFSEvaluatorPreserver.class)
+public final class YarnEvaluatorPreserver implements Name<EvaluatorPreserver> {
+  private YarnEvaluatorPreserver() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
new file mode 100644
index 0000000..b88f4a0
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * The DFS evaluator logger that performs regular append. dfs.support.append 
should be true.
+ */
+@Private
+public final class DFSEvaluatorLogAppendWriter implements 
DFSEvaluatorLogWriter {
+
+  private final FileSystem fileSystem;
+
+  private final Path changelogPath;
+
+  private boolean fsClosed = false;
+
+  DFSEvaluatorLogAppendWriter(final FileSystem fileSystem, final Path 
changelogPath) {
+    this.fileSystem = fileSystem;
+    this.changelogPath = changelogPath;
+  }
+
+  /**
+   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
+   * The entry is appended regularly by an FS that supports append.
+   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information).
+   * @throws IOException
+   */
+  @Override
+  public synchronized void writeToEvaluatorLog(final String formattedEntry) 
throws IOException {
+    final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
+
+    try (
+        final BufferedWriter bw = fileCreated ?
+            new BufferedWriter(new 
OutputStreamWriter(this.fileSystem.append(this.changelogPath))) :
+            new BufferedWriter(new 
OutputStreamWriter(this.fileSystem.create(this.changelogPath)))
+    ) {
+      bw.write(formattedEntry);
+    }
+  }
+
+  /**
+   * Closes the FileSystem.
+   * @throws Exception
+   */
+  @Override
+  public synchronized void close() throws Exception {
+    if (this.fileSystem != null && !this.fsClosed) {
+      this.fileSystem.close();
+      this.fsClosed = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
new file mode 100644
index 0000000..e461b45
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.*;
+
+/**
+ * The DFS evaluator logger that does not support append and does append by 
overwrite.
+ * dfs.support.append should be false.
+ */
+@Private
+public final class DFSEvaluatorLogOverwriteWriter implements 
DFSEvaluatorLogWriter {
+
+  private final FileSystem fileSystem;
+
+  private final Path changelogPath;
+
+  private boolean fsClosed = false;
+
+  DFSEvaluatorLogOverwriteWriter(final FileSystem fileSystem, final Path 
changelogPath) {
+    this.fileSystem = fileSystem;
+    this.changelogPath = changelogPath;
+  }
+
+  /**
+   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
+   * The log is appended to by reading first, adding on the information, and 
then overwriting the entire
+   * log.
+   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information).
+   * @throws IOException when file cannot be written.
+   */
+  @Override
+  public synchronized void writeToEvaluatorLog(final String formattedEntry) 
throws IOException {
+    final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
+
+    if (!fileCreated) {
+      try (
+          final BufferedWriter bw =
+              new BufferedWriter(new 
OutputStreamWriter(this.fileSystem.create(this.changelogPath)))) {
+        bw.write(formattedEntry);
+      }
+    } else {
+      this.appendByDeleteAndCreate(formattedEntry);
+    }
+  }
+
+  /**
+   * For certain HDFS implementation, the append operation may not be 
supported (e.g., Azure blob - wasb)
+   * in this case, we will emulate the append operation by reading the 
content, appending entry at the end,
+   * then recreating the file with appended content.
+   *
+   * @throws java.io.IOException when the file can't be written.
+   */
+  private void appendByDeleteAndCreate(final String appendEntry)
+      throws IOException {
+    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+    try (final InputStream inputStream = 
this.fileSystem.open(this.changelogPath)) {
+      IOUtils.copyBytes(inputStream, outputStream, 4096, true);
+    }
+
+    final String newContent = outputStream.toString() + appendEntry;
+    this.fileSystem.delete(this.changelogPath, true);
+
+    try (final FSDataOutputStream newOutput = 
this.fileSystem.create(this.changelogPath);
+         final InputStream newInput = new 
ByteArrayInputStream(newContent.getBytes())) {
+      IOUtils.copyBytes(newInput, newOutput, 4096, true);
+    }
+  }
+
+  /**
+   * Closes the FileSystem.
+   * @throws Exception
+   */
+  @Override
+  public synchronized void close() throws Exception {
+    if (this.fileSystem != null && !this.fsClosed) {
+      this.fileSystem.close();
+      this.fsClosed = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
new file mode 100644
index 0000000..b5e3881
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.IOException;
+
+/**
+ * The Evaluator log writer that writes to DFS. Currently supports regular 
append and append by overwrite.
+ * Actual log entries should be immutable and no entry should ever be deleted. 
To remove an evaluator, a
+ * removal entry should be preferred.
+ */
+@Private
+public interface DFSEvaluatorLogWriter extends AutoCloseable {
+
+  /**
+   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
+   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information)
+   * @throws IOException
+   */
+  void writeToEvaluatorLog(final String formattedEntry) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
new file mode 100644
index 0000000..409e392
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
+import org.apache.reef.driver.parameters.FailDriverOnEvaluatorLogErrors;
+import org.apache.reef.exception.DriverFatalRuntimeException;
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An Evaluator Preserver that uses the DFS on YARN.
+ */
+@DriverSide
+@RuntimeAuthor
+@Unstable
+public final class DFSEvaluatorPreserver implements EvaluatorPreserver, 
AutoCloseable {
+  private static final Logger LOG = 
Logger.getLogger(DFSEvaluatorPreserver.class.getName());
+
+  private static final String ADD_FLAG = "+";
+
+  private static final String REMOVE_FLAG = "-";
+
+  private final boolean failDriverOnEvaluatorLogErrors;
+
+  private DFSEvaluatorLogWriter writer;
+
+  private Path changeLogLocation;
+
+  private FileSystem fileSystem;
+
+  private boolean writerClosed = false;
+
+  @Inject 
DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class)
+                                final boolean failDriverOnEvaluatorLogErrors) {
+    this(failDriverOnEvaluatorLogErrors, "/ReefApplications/" + 
EvaluatorManager.getJobIdentifier());
+  }
+
+  @Inject
+  private 
DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class)
+                                final boolean failDriverOnEvaluatorLogErrors,
+                                @Parameter(DriverJobSubmissionDirectory.class)
+                                final String jobSubmissionDirectory) {
+
+    this.failDriverOnEvaluatorLogErrors = failDriverOnEvaluatorLogErrors;
+
+    try {
+      final org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
+      this.fileSystem = FileSystem.get(config);
+      this.changeLogLocation =
+          new Path(StringUtils.stripEnd(jobSubmissionDirectory, "/") + 
"/evaluatorsChangesLog");
+
+      boolean appendSupported = config.getBoolean("dfs.support.append", false);
+
+      if (appendSupported) {
+        this.writer = new DFSEvaluatorLogAppendWriter(this.fileSystem, 
this.changeLogLocation);
+      } else {
+        this.writer = new DFSEvaluatorLogOverwriteWriter(this.fileSystem, 
this.changeLogLocation);
+      }
+    } catch (final IOException e) {
+      final String errMsg = "Cannot read from log file with Exception " + e +
+          ", evaluators will not be recovered.";
+      final String fatalMsg = "Driver was not able to instantiate FileSystem.";
+
+      this.handleException(e, errMsg, fatalMsg);
+      this.fileSystem = null;
+      this.changeLogLocation = null;
+      this.writer = null;
+    }
+  }
+
+  /**
+   * Recovers the set of evaluators that are alive.
+   * @return
+   */
+  @Override
+  public synchronized Set<String> recoverEvaluators() {
+    final Set<String> expectedContainers = new HashSet<>();
+    try {
+      if (this.fileSystem == null || this.changeLogLocation == null) {
+        LOG.log(Level.WARNING, "Unable to recover evaluators due to failure to 
instantiate FileSystem. Returning an" +
+            " empty set.");
+        return expectedContainers;
+      }
+
+      if (!this.fileSystem.exists(this.changeLogLocation)) {
+        // empty set
+        return expectedContainers;
+      } else {
+        final BufferedReader br =
+            new BufferedReader(new 
InputStreamReader(this.fileSystem.open(this.changeLogLocation)));
+        String line = br.readLine();
+        while (line != null) {
+          if (line.startsWith(ADD_FLAG)) {
+            final String containerId = line.substring(ADD_FLAG.length());
+            if (expectedContainers.contains(containerId)) {
+              LOG.log(Level.WARNING, "Duplicated add container record found in 
the change log for container " +
+                  containerId);
+            } else {
+              expectedContainers.add(containerId);
+            }
+          } else if (line.startsWith(REMOVE_FLAG)) {
+            final String containerId = line.substring(REMOVE_FLAG.length());
+            if (!expectedContainers.contains(containerId)) {
+              LOG.log(Level.WARNING, "Change log includes record that try to 
remove non-exist or duplicate " +
+                  "remove record for container + " + containerId);
+            }
+            expectedContainers.remove(containerId);
+          }
+          line = br.readLine();
+        }
+        br.close();
+      }
+    } catch (final IOException e) {
+      final String errMsg = "Cannot read from log file with Exception " + e +
+          ", evaluators will not be recovered.";
+
+      final String fatalMsg = "Cannot read from evaluator log.";
+
+      this.handleException(e, errMsg, fatalMsg);
+    }
+    return expectedContainers;
+  }
+
+  /**
+   * Adds the allocated evaluator entry to the evaluator log.
+   * @param id
+   */
+  @Override
+  public synchronized void recordAllocatedEvaluator(final String id) {
+    if (this.fileSystem != null && this.changeLogLocation != null) {
+      final String entry = ADD_FLAG + id + System.lineSeparator();
+      this.logContainerChange(entry);
+    }
+  }
+
+  /**
+   * Adds the removed evaluator entry to the evaluator log.
+   * @param id
+   */
+  @Override
+  public synchronized void recordRemovedEvaluator(final String id) {
+    if (this.fileSystem != null && this.changeLogLocation != null) {
+      final String entry = REMOVE_FLAG + id + System.lineSeparator();
+      this.logContainerChange(entry);
+    }
+  }
+
+  private void logContainerChange(final String entry) {
+    try {
+      this.writer.writeToEvaluatorLog(entry);
+    } catch (final IOException e) {
+      final String errorMsg = "Unable to log the change of container [" + 
entry +
+          "] to the container log. Driver restart won't work properly.";
+
+      final String fatalMsg = "Unable to log container change.";
+
+      this.handleException(e, errorMsg, fatalMsg);
+    }
+  }
+
+  private void handleException(final Exception e, final String errorMsg, final 
String fatalMsg){
+    if (this.failDriverOnEvaluatorLogErrors) {
+      final Level logLevel;
+      if (this.failDriverOnEvaluatorLogErrors) {
+        logLevel = Level.SEVERE;
+      } else {
+        logLevel = Level.WARNING;
+      }
+
+      LOG.log(logLevel, errorMsg, e);
+
+      if (this.failDriverOnEvaluatorLogErrors) {
+        try {
+          this.close();
+        } catch (Exception e1) {
+          LOG.log(Level.SEVERE, "Failed on closing resource with " + 
e1.getStackTrace());
+        }
+
+        if (fatalMsg != null) {
+          throw new DriverFatalRuntimeException(fatalMsg, e);
+        } else {
+          throw new DriverFatalRuntimeException("Driver failed on Evaluator 
log error.", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes the writer, which in turn closes the FileSystem.
+   * @throws Exception
+   */
+  @Override
+  public synchronized void close() throws Exception {
+    if (this.writer != null && !this.writerClosed) {
+      this.writer.close();
+      this.writerClosed = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/package-info.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/package-info.java
new file mode 100644
index 0000000..8dde2fb
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The package contains classes that are used specifically on restart.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
\ No newline at end of file

Reply via email to