Repository: incubator-reef
Updated Branches:
  refs/heads/master 7395bc681 -> 59489156c


[REEF-587] Create a default implementation for DriverRestartManager

This addressed the issue by
  * Changing DriverRestartManager from an interface to a final class.
  * Removing optional DriverRestartManager parameters.
  * Adding a default DriverRuntimeRestartManager that fails on any
    restart related options.

JIRA:
  [REEF-587](https://issues.apache.org/jira/browse/REEF-587)

Pull Request:
  This closes #371


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/59489156
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/59489156
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/59489156

Branch: refs/heads/master
Commit: 59489156c6d42f24997217cd33e9f047e568dd8c
Parents: 7395bc6
Author: Andrew Chung <[email protected]>
Authored: Thu Aug 13 16:48:43 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 14 13:05:09 2015 -0700

----------------------------------------------------------------------
 .../DefaultDriverRuntimeRestartMangerImpl.java  |  70 ++++++++++
 .../driver/restart/DriverRestartManager.java    |  90 +++++++++++--
 .../restart/DriverRestartManagerImpl.java       | 132 -------------------
 .../driver/restart/DriverRestartUtilities.java  |  16 +--
 .../restart/DriverRuntimeRestartManager.java    |   4 +-
 .../DriverRuntimeRestartConfiguration.java      |   1 -
 .../common/driver/DriverStartHandler.java       |  57 ++------
 .../driver/context/ContextRepresenters.java     |  14 +-
 .../driver/evaluator/EvaluatorManager.java      |  66 +---------
 .../common/driver/task/TaskRepresenter.java     |   4 +-
 10 files changed, 179 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
new file mode 100644
index 0000000..312c183
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.exception.DriverFatalRuntimeException;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * The default driver runtime restart manager that is not able to perform any 
restart actions.
+ * Thus, when performing actions pertaining to restart, it is recommended to 
call static functions in
+ * {@link DriverRestartUtilities} or call canRestart() first.
+ */
+@Private
+@DriverSide
+@Unstable
+final class DefaultDriverRuntimeRestartMangerImpl implements 
DriverRuntimeRestartManager {
+  @Inject
+  private DefaultDriverRuntimeRestartMangerImpl() {
+  }
+
+  @Override
+  public boolean isRestart() {
+    return false;
+  }
+
+  @Override
+  public void recordAllocatedEvaluator(final String id) {
+    throw new DriverFatalRuntimeException(
+        "Restart is not enabled. recordAllocatedEvaluator should not have been 
called.");
+  }
+
+  @Override
+  public void recordRemovedEvaluator(final String id) {
+    throw new DriverFatalRuntimeException(
+        "Restart is not enabled. recordRemovedEvaluator should not have been 
called.");
+  }
+
+  @Override
+  public EvaluatorRestartInfo getAliveAndFailedEvaluators() {
+    throw new DriverFatalRuntimeException(
+        "Restart is not enabled. getAliveAndFailedEvaluators should not have 
been called.");
+  }
+
+  @Override
+  public void informAboutEvaluatorFailures(final Set<String> 
failedEvaluatorIds) {
+    throw new DriverFatalRuntimeException(
+        "Restart is not enabled. informAboutEvaluatorFailures should not have 
been called.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 57c31c8..c517b04 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -21,8 +21,14 @@ package org.apache.reef.driver.restart;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.exception.DriverFatalRuntimeException;
 
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * The manager that handles aspects of driver restart such as determining 
whether the driver is in
@@ -31,29 +37,61 @@ import java.util.Set;
 @DriverSide
 @Private
 @Unstable
-public interface DriverRestartManager {
+public final class DriverRestartManager {
+  private static final Logger LOG = 
Logger.getLogger(DriverRestartManager.class.getName());
+  private final DriverRuntimeRestartManager driverRuntimeRestartManager;
+  private final Set<String> previousEvaluators;
+  private final Set<String> recoveredEvaluators;
+  private DriverRestartState state;
+
+  @Inject
+  private DriverRestartManager(final DriverRuntimeRestartManager 
driverRuntimeRestartManager) {
+    this.driverRuntimeRestartManager = driverRuntimeRestartManager;
+    this.state = DriverRestartState.NotRestarted;
+    this.previousEvaluators = new HashSet<>();
+    this.recoveredEvaluators = new HashSet<>();
+  }
 
   /**
    * @return Whether or not the driver instance is a restarted instance.
    */
-  boolean isRestart();
+  public synchronized boolean isRestart() {
+    if (this.state.isRestart()) {
+      return true;
+    }
+
+    if (driverRuntimeRestartManager.isRestart()) {
+      this.state = DriverRestartState.RestartBegan;
+      return true;
+    }
+
+    return false;
+  }
 
   /**
    * Recovers the list of alive and failed evaluators and inform about 
evaluator failures
    * based on the specific runtime. Also sets the expected amount of 
evaluators to report back
    * as alive to the job driver.
    */
-  void onRestart();
+  public synchronized void onRestart() {
+    final EvaluatorRestartInfo evaluatorRestartInfo = 
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
+    setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
+    
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
+  }
 
   /**
    * @return whether restart is completed.
    */
-  boolean isRestartCompleted();
+  public synchronized boolean isRestartCompleted() {
+    return this.state == DriverRestartState.RestartCompleted;
+  }
 
   /**
    * @return the Evaluators expected to check in from a previous run.
    */
-  Set<String> getPreviousEvaluatorIds();
+  public synchronized Set<String> getPreviousEvaluatorIds() {
+    return Collections.unmodifiableSet(this.previousEvaluators);
+  }
 
   /**
    * Set the Evaluators to expect still active from a previous execution of 
the Driver in a restart situation.
@@ -61,29 +99,61 @@ public interface DriverRestartManager {
    *
    * @param ids the evaluator IDs of the evaluators that are expected to have 
survived driver restart.
    */
-  void setPreviousEvaluatorIds(final Set<String> ids);
+  public synchronized void setPreviousEvaluatorIds(final Set<String> ids) {
+    if (this.state != DriverRestartState.RestartInProgress) {
+      previousEvaluators.addAll(ids);
+      this.state = DriverRestartState.RestartInProgress;
+    } else {
+      final String errMsg = "Should not be setting the set of expected alive 
evaluators more than once.";
+      LOG.log(Level.SEVERE, errMsg);
+      throw new DriverFatalRuntimeException(errMsg);
+    }
+  }
 
   /**
    * @return the IDs of the Evaluators from a previous Driver that have 
checked in with the Driver
    * in a restart situation.
    */
-  Set<String> getRecoveredEvaluatorIds();
+  public synchronized Set<String> getRecoveredEvaluatorIds() {
+    return Collections.unmodifiableSet(this.previousEvaluators);
+  }
 
   /**
    * Indicate that this Driver has re-established the connection with one more 
Evaluator of a previous run.
    * @return true if the driver restart is completed.
    */
-  boolean evaluatorRecovered(final String id);
+  public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String 
evaluatorId) {
+    if (!this.previousEvaluators.contains(evaluatorId)) {
+      final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " 
not expected to be alive.";
+      LOG.log(Level.SEVERE, errMsg);
+      throw new DriverFatalRuntimeException(errMsg);
+    }
+
+    if (!this.recoveredEvaluators.add(evaluatorId)) {
+      LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " 
added to the set" +
+          " of recovered evaluators more than once. Ignoring second add...");
+    }
+
+    if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) {
+      this.state = DriverRestartState.RestartCompleted;
+    }
+
+    return this.state == DriverRestartState.RestartCompleted;
+  }
 
   /**
    * Records the evaluators when it is allocated. The implementation depends 
on the runtime.
    * @param id The evaluator ID of the allocated evaluator.
    */
-  void recordAllocatedEvaluator(final String id);
+  public synchronized void recordAllocatedEvaluator(final String id) {
+    driverRuntimeRestartManager.recordAllocatedEvaluator(id);
+  }
 
   /**
    * Records a removed evaluator into the evaluator log. The implementation 
depends on the runtime.
    * @param id The evaluator ID of the removed evaluator.
    */
-  void recordRemovedEvaluator(final String id);
+  public synchronized void recordRemovedEvaluator(final String id) {
+    driverRuntimeRestartManager.recordRemovedEvaluator(id);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
deleted file mode 100644
index 4ea3b56..0000000
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.driver.restart;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.exception.DriverFatalRuntimeException;
-
-import javax.inject.Inject;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * The implementation of DriverRestartManager. A few methods here are proxy 
methods for
- * the DriverRuntimeRestartManager that depends on the runtime implementation.
- */
-@DriverSide
-@Private
-@Unstable
-public final class DriverRestartManagerImpl implements DriverRestartManager {
-  private static final Logger LOG = 
Logger.getLogger(DriverRestartManagerImpl.class.getName());
-  private final DriverRuntimeRestartManager driverRuntimeRestartManager;
-  private final Set<String> previousEvaluators;
-  private final Set<String> recoveredEvaluators;
-  private DriverRestartState state;
-
-  @Inject
-  private DriverRestartManagerImpl(final DriverRuntimeRestartManager 
driverRuntimeRestartManager) {
-    this.driverRuntimeRestartManager = driverRuntimeRestartManager;
-    this.state = DriverRestartState.NotRestarted;
-    this.previousEvaluators = new HashSet<>();
-    this.recoveredEvaluators = new HashSet<>();
-  }
-
-  @Override
-  public synchronized boolean isRestart() {
-    if (this.state.isRestart()) {
-      return true;
-    }
-
-    if (driverRuntimeRestartManager.isRestart()) {
-      this.state = DriverRestartState.RestartBegan;
-      return true;
-    }
-
-    return false;
-  }
-
-  @Override
-  public synchronized void onRestart() {
-    final EvaluatorRestartInfo evaluatorRestartInfo = 
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
-    setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
-    
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
-  }
-
-  @Override
-  public synchronized boolean isRestartCompleted() {
-    return this.state == DriverRestartState.RestartCompleted;
-  }
-
-  @Override
-  public synchronized Set<String> getPreviousEvaluatorIds() {
-    return Collections.unmodifiableSet(this.previousEvaluators);
-  }
-
-  @Override
-  public synchronized void setPreviousEvaluatorIds(final Set<String> ids) {
-    if (this.state != DriverRestartState.RestartInProgress) {
-      previousEvaluators.addAll(ids);
-      this.state = DriverRestartState.RestartInProgress;
-    } else {
-      final String errMsg = "Should not be setting the set of expected alive 
evaluators more than once.";
-      LOG.log(Level.SEVERE, errMsg);
-      throw new DriverFatalRuntimeException(errMsg);
-    }
-  }
-
-  @Override
-  public synchronized Set<String> getRecoveredEvaluatorIds() {
-    return Collections.unmodifiableSet(this.previousEvaluators);
-  }
-
-  @Override
-  public synchronized boolean evaluatorRecovered(final String evaluatorId) {
-    if (!this.previousEvaluators.contains(evaluatorId)) {
-      final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " 
not expected to be alive.";
-      LOG.log(Level.SEVERE, errMsg);
-      throw new DriverFatalRuntimeException(errMsg);
-    }
-
-    if (!this.recoveredEvaluators.add(evaluatorId)) {
-      LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " 
added to the set" +
-          " of recovered evaluators more than once. Ignoring second add...");
-    }
-
-    if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) {
-      this.state = DriverRestartState.RestartCompleted;
-    }
-
-    return this.state == DriverRestartState.RestartCompleted;
-  }
-
-  @Override
-  public void recordAllocatedEvaluator(final String id) {
-    driverRuntimeRestartManager.recordAllocatedEvaluator(id);
-  }
-
-  @Override
-  public void recordRemovedEvaluator(final String id) {
-    driverRuntimeRestartManager.recordRemovedEvaluator(id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
index dcf753b..469a4d1 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
@@ -21,10 +21,11 @@ package org.apache.reef.driver.restart;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.util.Optional;
 
 /**
  * A static utilities class for simplifying calls to driver restart manager.
+ * Functions here should always call driverRestartManager.canRestart() before 
performing any
+ * actual options.
  */
 @Private
 @DriverSide
@@ -33,19 +34,6 @@ public final class DriverRestartUtilities {
 
   /**
    * Helper function for driver restart to determine whether an evaluator ID 
is from an evaluator from the
-   * previous application attempt. DriverRestartManager is optional here.
-   */
-  public static boolean isRestartAndIsPreviousEvaluator(final 
Optional<DriverRestartManager> driverRestartManager,
-                                                        final String 
evaluatorId) {
-    if (!driverRestartManager.isPresent()) {
-      return false;
-    }
-
-    return isRestartAndIsPreviousEvaluator(driverRestartManager.get(), 
evaluatorId);
-  }
-
-  /**
-   * Helper function for driver restart to determine whether an evaluator ID 
is from an evaluator from the
    * previous application attempt.
    */
   public static boolean isRestartAndIsPreviousEvaluator(final 
DriverRestartManager driverRestartManager,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
index c581e93..607a031 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.tang.annotations.DefaultImplementation;
 
 import java.util.Set;
 
@@ -34,9 +35,10 @@ import java.util.Set;
 @Private
 @RuntimeAuthor
 @Unstable
+@DefaultImplementation(DefaultDriverRuntimeRestartMangerImpl.class)
 public interface DriverRuntimeRestartManager {
   /**
-   * Determines whether or not the driver has been restarted.
+   * Determines whether or not the driver has been restarted. The default 
implementation always returns false.
    */
   boolean isRestart();
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index cbac9ea..0db7a54 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -43,7 +43,6 @@ public final class DriverRuntimeRestartConfiguration extends 
ConfigurationModule
       // Automatically sets preserve evaluators to true.
       .bindNamedParameter(ResourceManagerPreserveEvaluators.class, 
Boolean.toString(true))
 
-      .bindImplementation(DriverRestartManager.class, 
DriverRestartManagerImpl.class)
       .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, 
EvaluatorPreservingEvaluatorAllocatedHandler.class)
       .bindSetEntry(ServiceEvaluatorFailedHandlers.class, 
EvaluatorPreservingEvaluatorFailedHandler.class)
       .bindSetEntry(ServiceEvaluatorCompletedHandlers.class, 
EvaluatorPreservingEvaluatorCompletedHandler.class)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 2811497..62e53b7 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -23,7 +23,6 @@ import 
org.apache.reef.driver.parameters.ServiceDriverRestartedHandlers;
 import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.util.Optional;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.time.event.StartTime;
 
@@ -39,48 +38,31 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
   private static final Logger LOG = 
Logger.getLogger(DriverStartHandler.class.getName());
 
   private final Set<EventHandler<StartTime>> startHandlers;
-  private final Optional<Set<EventHandler<StartTime>>> restartHandlers;
-  private final Optional<Set<EventHandler<StartTime>>> serviceRestartHandlers;
-  private final Optional<DriverRestartManager> driverRestartManager;
+  private final Set<EventHandler<StartTime>> restartHandlers;
+  private final Set<EventHandler<StartTime>> serviceRestartHandlers;
+  private final DriverRestartManager driverRestartManager;
 
   @Inject
   
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
-                     final Set<EventHandler<StartTime>> startHandler,
+                     final Set<EventHandler<StartTime>> startHandlers,
                      @Parameter(DriverRestartHandler.class)
                      final Set<EventHandler<StartTime>> restartHandlers,
                      @Parameter(ServiceDriverRestartedHandlers.class)
                      final Set<EventHandler<StartTime>> serviceRestartHandlers,
                      final DriverRestartManager driverRestartManager) {
-    this(startHandler, Optional.of(restartHandlers), 
Optional.of(serviceRestartHandlers),
-        Optional.of(driverRestartManager));
-    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers 
[{0}], RestartHandlers [{1}]," +
-            "and ServiceRestartHandlers [{2}], with a restart manager.",
-        new String[] {this.startHandlers.toString(), 
this.restartHandlers.toString(),
-            this.serviceRestartHandlers.toString()});
-  }
-
-  @Inject
-  
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
-                     final Set<EventHandler<StartTime>> startHandlers) {
-    this(startHandlers, Optional.<Set<EventHandler<StartTime>>>empty(),
-        Optional.<Set<EventHandler<StartTime>>>empty(), 
Optional.<DriverRestartManager>empty());
-    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers 
[{0}] and no restart.",
-        this.startHandlers.toString());
-  }
-
-  private DriverStartHandler(final Set<EventHandler<StartTime>> startHandler,
-                             final Optional<Set<EventHandler<StartTime>>> 
restartHandlers,
-                             final Optional<Set<EventHandler<StartTime>>> 
serviceRestartHandlers,
-                             final Optional<DriverRestartManager> 
driverRestartManager) {
-    this.startHandlers = startHandler;
+    this.startHandlers = startHandlers;
     this.restartHandlers = restartHandlers;
     this.serviceRestartHandlers = serviceRestartHandlers;
     this.driverRestartManager = driverRestartManager;
+    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers 
[{0}], RestartHandlers [{1}]," +
+            "and ServiceRestartHandlers [{2}].",
+        new String[] {this.startHandlers.toString(), 
this.restartHandlers.toString(),
+            this.serviceRestartHandlers.toString()});
   }
 
   @Override
   public void onNext(final StartTime startTime) {
-    if (isRestart()) {
+    if (this.driverRestartManager.isRestart()) {
       this.onRestart(startTime);
     } else {
       this.onStart(startTime);
@@ -88,18 +70,18 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
   }
 
   private void onRestart(final StartTime startTime) {
-    if (this.driverRestartManager.isPresent() && 
this.restartHandlers.isPresent()) {
-      for (EventHandler<StartTime> serviceRestartHandler : 
this.serviceRestartHandlers.get()) {
+    if (this.restartHandlers.size() > 0) {
+      for (EventHandler<StartTime> serviceRestartHandler : 
this.serviceRestartHandlers) {
         serviceRestartHandler.onNext(startTime);
       }
 
-      for (EventHandler<StartTime> restartHandler : 
this.restartHandlers.get()){
+      for (EventHandler<StartTime> restartHandler : this.restartHandlers){
         restartHandler.onNext(startTime);
       }
 
       // This can only be called after calling client restart handlers because 
REEF.NET
       // JobDriver requires making this call to set up the InterOp handlers.
-      this.driverRestartManager.get().onRestart();
+      this.driverRestartManager.onRestart();
     } else {
       throw new DriverFatalRuntimeException("Driver restart happened, but no 
ON_DRIVER_RESTART handler is bound.");
     }
@@ -110,15 +92,4 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
       startHandler.onNext(startTime);
     }
   }
-
-  /**
-   * @return true, if the configurations enable restart and the Driver is in 
fact being restarted.
-   */
-  private boolean isRestart() {
-    if (this.driverRestartManager.isPresent()) {
-      return this.driverRestartManager.get().isRestart();
-    }
-
-    return false;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index a8d538e..41bdd43 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -45,7 +45,7 @@ public final class ContextRepresenters {
 
   private final EvaluatorMessageDispatcher messageDispatcher;
   private final ContextFactory contextFactory;
-  private final Optional<DriverRestartManager> driverRestartManager;
+  private final DriverRestartManager driverRestartManager;
 
   // Mutable fields
   @GuardedBy("this")
@@ -55,20 +55,8 @@ public final class ContextRepresenters {
 
   @Inject
   private ContextRepresenters(final EvaluatorMessageDispatcher 
messageDispatcher,
-                              final ContextFactory contextFactory) {
-    this(messageDispatcher, contextFactory, 
Optional.<DriverRestartManager>empty());
-  }
-
-  @Inject
-  private ContextRepresenters(final EvaluatorMessageDispatcher 
messageDispatcher,
                               final ContextFactory contextFactory,
                               final DriverRestartManager driverRestartManager) 
{
-    this(messageDispatcher, contextFactory, Optional.of(driverRestartManager));
-  }
-
-  private ContextRepresenters(final EvaluatorMessageDispatcher 
messageDispatcher,
-                              final ContextFactory contextFactory,
-                              final Optional<DriverRestartManager> 
driverRestartManager) {
     this.messageDispatcher = messageDispatcher;
     this.contextFactory = contextFactory;
     this.driverRestartManager = driverRestartManager;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 032c0cb..12060dd 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.driver.evaluator;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.evaluator.CLRProcessFactory;
+import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
 import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.driver.restart.DriverRestartUtilities;
 import org.apache.reef.tang.ConfigurationProvider;
@@ -29,7 +30,6 @@ import org.apache.reef.driver.context.FailedContext;
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
 import org.apache.reef.driver.evaluator.JVMProcessFactory;
-import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
 import org.apache.reef.driver.task.FailedTask;
 import org.apache.reef.exception.EvaluatorException;
 import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
@@ -104,7 +104,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
   private final JVMProcessFactory jvmProcessFactory;
   private final CLRProcessFactory clrProcessFactory;
-  private final Optional<DriverRestartManager> driverRestartManager;
+  private final DriverRestartManager driverRestartManager;
 
   // Mutable fields
   private Optional<TaskRepresenter> task = Optional.empty();
@@ -130,63 +130,10 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       final LoggingScopeFactory loggingScopeFactory,
       @Parameter(EvaluatorConfigurationProviders.class)
       final Set<ConfigurationProvider> evaluatorConfigurationProviders,
-      final JVMProcessFactory jvmProcessFactory,
-      final CLRProcessFactory clrProcessFactory,
-      final DriverRestartManager driverRestartManager) {
-    this(clock, remoteManager, resourceReleaseHandler, resourceLaunchHandler, 
evaluatorId, evaluatorDescriptor,
-        contextRepresenters, configurationSerializer, messageDispatcher, 
evaluatorControlHandler,
-        contextControlHandler, stateManager, exceptionCodec, idlenessSource, 
loggingScopeFactory,
-        evaluatorConfigurationProviders, jvmProcessFactory, clrProcessFactory, 
Optional.of(driverRestartManager));
-  }
-
-  @Inject
-  private EvaluatorManager(
-      final Clock clock,
-      final RemoteManager remoteManager,
-      final ResourceReleaseHandler resourceReleaseHandler,
-      final ResourceLaunchHandler resourceLaunchHandler,
-      @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
-      @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl 
evaluatorDescriptor,
-      final ContextRepresenters contextRepresenters,
-      final ConfigurationSerializer configurationSerializer,
-      final EvaluatorMessageDispatcher messageDispatcher,
-      final EvaluatorControlHandler evaluatorControlHandler,
-      final ContextControlHandler contextControlHandler,
-      final EvaluatorStatusManager stateManager,
-      final ExceptionCodec exceptionCodec,
-      final EventHandlerIdlenessSource idlenessSource,
-      final LoggingScopeFactory loggingScopeFactory,
-      @Parameter(EvaluatorConfigurationProviders.class)
-      final Set<ConfigurationProvider> evaluatorConfigurationProviders,
-      final JVMProcessFactory jvmProcessFactory,
-      final CLRProcessFactory clrProcessFactory) {
-    this(clock, remoteManager, resourceReleaseHandler, resourceLaunchHandler, 
evaluatorId, evaluatorDescriptor,
-        contextRepresenters, configurationSerializer, messageDispatcher, 
evaluatorControlHandler,
-        contextControlHandler, stateManager, exceptionCodec, idlenessSource, 
loggingScopeFactory,
-        evaluatorConfigurationProviders, jvmProcessFactory, clrProcessFactory, 
Optional.<DriverRestartManager>empty());
-  }
-
-  private EvaluatorManager(
-      final Clock clock,
-      final RemoteManager remoteManager,
-      final ResourceReleaseHandler resourceReleaseHandler,
-      final ResourceLaunchHandler resourceLaunchHandler,
-      final String evaluatorId,
-      final EvaluatorDescriptorImpl evaluatorDescriptor,
-      final ContextRepresenters contextRepresenters,
-      final ConfigurationSerializer configurationSerializer,
-      final EvaluatorMessageDispatcher messageDispatcher,
-      final EvaluatorControlHandler evaluatorControlHandler,
-      final ContextControlHandler contextControlHandler,
-      final EvaluatorStatusManager stateManager,
-      final ExceptionCodec exceptionCodec,
-      final EventHandlerIdlenessSource idlenessSource,
-      final LoggingScopeFactory loggingScopeFactory,
-      final Set<ConfigurationProvider> evaluatorConfigurationProviders,
       // TODO: Eventually remove the factories when they are removed from 
AllocatedEvaluatorImpl
       final JVMProcessFactory jvmProcessFactory,
       final CLRProcessFactory clrProcessFactory,
-      final Optional<DriverRestartManager> driverRestartManager) {
+      final DriverRestartManager driverRestartManager) {
     this.contextRepresenters = contextRepresenters;
     this.idlenessSource = idlenessSource;
     LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: 
{0}", evaluatorId);
@@ -396,7 +343,8 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
         this.evaluatorControlHandler.setRemoteID(evaluatorRID);
         this.stateManager.setRunning();
 
-        boolean restartCompleted = 
this.driverRestartManager.get().evaluatorRecovered(this.evaluatorId);
+        final boolean restartCompleted =
+            
this.driverRestartManager.onRecoverEvaluatorIsRestartComplete(this.evaluatorId);
 
         LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", 
this.evaluatorId);
 
@@ -405,8 +353,8 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
           LOG.log(Level.INFO, "All expected evaluators checked in.");
         } else {
           LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.",
-              new 
Object[]{this.driverRestartManager.get().getPreviousEvaluatorIds(),
-                  this.driverRestartManager.get().getRecoveredEvaluatorIds()});
+              new Object[]{this.driverRestartManager.getPreviousEvaluatorIds(),
+                  this.driverRestartManager.getRecoveredEvaluatorIds()});
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 75dd24e..5436494 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -49,7 +49,7 @@ public final class TaskRepresenter {
   private final EvaluatorManager evaluatorManager;
   private final ExceptionCodec exceptionCodec;
   private final String taskId;
-  private final Optional<DriverRestartManager> driverRestartManager;
+  private final DriverRestartManager driverRestartManager;
 
   // Mutable state
   private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
@@ -59,7 +59,7 @@ public final class TaskRepresenter {
                          final EvaluatorMessageDispatcher messageDispatcher,
                          final EvaluatorManager evaluatorManager,
                          final ExceptionCodec exceptionCodec,
-                         final Optional<DriverRestartManager> 
driverRestartManager) {
+                         final DriverRestartManager driverRestartManager) {
     this.taskId = taskId;
     this.context = context;
     this.messageDispatcher = messageDispatcher;


Reply via email to