Repository: incubator-reef
Updated Branches:
  refs/heads/master fd18d2f40 -> 46de9e70a


[REEF-506] Move restart determination from Evaluator to Driver

This change makes sure that the Driver calls restart handlers by checking the
restart status based on DriverRestartManager instead of using the protobuf
message from the recovered evaluator.

JIRA:
  [REEF-506](https://issues.apache.org/jira/browse/REEF-506)

Pull Request:
  This closes #364


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/46de9e70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/46de9e70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/46de9e70

Branch: refs/heads/master
Commit: 46de9e70a09d0dd54340ba4d5f41718959564525
Parents: fd18d2f
Author: Andrew Chung <[email protected]>
Authored: Tue Aug 11 13:54:36 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 12 13:59:26 2015 -0700

----------------------------------------------------------------------
 .../restart/DriverRestartManagerImpl.java       | 33 +++++----
 .../reef/driver/restart/DriverRestartState.java | 71 ++++++++++++++++++++
 .../driver/restart/DriverRestartUtilities.java  | 58 ++++++++++++++++
 .../driver/context/ContextRepresenters.java     | 21 +++++-
 .../driver/evaluator/EvaluatorManager.java      | 43 ++++++------
 .../common/driver/task/TaskRepresenter.java     |  9 ++-
 6 files changed, 194 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
index 2b07027..4ea3b56 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
@@ -42,34 +42,40 @@ public final class DriverRestartManagerImpl implements 
DriverRestartManager {
   private final DriverRuntimeRestartManager driverRuntimeRestartManager;
   private final Set<String> previousEvaluators;
   private final Set<String> recoveredEvaluators;
-
-  private boolean restartBegan;
-  private boolean restartCompleted;
+  private DriverRestartState state;
 
   @Inject
   private DriverRestartManagerImpl(final DriverRuntimeRestartManager 
driverRuntimeRestartManager) {
     this.driverRuntimeRestartManager = driverRuntimeRestartManager;
-    this.restartCompleted = false;
-    this.restartBegan = false;
+    this.state = DriverRestartState.NotRestarted;
     this.previousEvaluators = new HashSet<>();
     this.recoveredEvaluators = new HashSet<>();
   }
 
   @Override
-  public boolean isRestart() {
-    return driverRuntimeRestartManager.isRestart();
+  public synchronized boolean isRestart() {
+    if (this.state.isRestart()) {
+      return true;
+    }
+
+    if (driverRuntimeRestartManager.isRestart()) {
+      this.state = DriverRestartState.RestartBegan;
+      return true;
+    }
+
+    return false;
   }
 
   @Override
-  public void onRestart() {
+  public synchronized void onRestart() {
     final EvaluatorRestartInfo evaluatorRestartInfo = 
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
     setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
     
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
   }
 
   @Override
-  public boolean isRestartCompleted() {
-    return this.restartCompleted;
+  public synchronized boolean isRestartCompleted() {
+    return this.state == DriverRestartState.RestartCompleted;
   }
 
   @Override
@@ -79,8 +85,9 @@ public final class DriverRestartManagerImpl implements 
DriverRestartManager {
 
   @Override
   public synchronized void setPreviousEvaluatorIds(final Set<String> ids) {
-    if (!this.restartBegan) {
+    if (this.state != DriverRestartState.RestartInProgress) {
       previousEvaluators.addAll(ids);
+      this.state = DriverRestartState.RestartInProgress;
     } else {
       final String errMsg = "Should not be setting the set of expected alive 
evaluators more than once.";
       LOG.log(Level.SEVERE, errMsg);
@@ -107,10 +114,10 @@ public final class DriverRestartManagerImpl implements 
DriverRestartManager {
     }
 
     if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) {
-      this.restartCompleted = true;
+      this.state = DriverRestartState.RestartCompleted;
     }
 
-    return this.restartCompleted;
+    return this.state == DriverRestartState.RestartCompleted;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
new file mode 100644
index 0000000..04c711f
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Represents the current driver restart progress.
+ */
+@Private
+@DriverSide
+@Unstable
+public enum DriverRestartState {
+  /**
+   *  Driver restart is not implemented.
+   */
+  NotImplemented,
+
+  /**
+   *  Driver has not begun the restart progress yet.
+   */
+  NotRestarted,
+
+  /**
+   * Driver has been notified of the restart by the runtime, but has not yet
+   * received its set of evaluator IDs to recover yet.
+   */
+  RestartBegan,
+
+  /**
+   * Driver has received its set of evaluator IDs to recover.
+   */
+  RestartInProgress,
+
+  /**
+   * Driver has recovered all the evaluator IDs that it can, and the restart 
process is completed.
+   */
+  RestartCompleted;
+
+  /**
+   * Returns true if the restart process has began.
+   */
+  public boolean isRestart() {
+    switch (this) {
+    case RestartBegan:
+    case RestartInProgress:
+    case RestartCompleted:
+      return true;
+    default:
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
new file mode 100644
index 0000000..dcf753b
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.Optional;
+
+/**
+ * A static utilities class for simplifying calls to driver restart manager.
+ */
+@Private
+@DriverSide
+@Unstable
+public final class DriverRestartUtilities {
+
+  /**
+   * Helper function for driver restart to determine whether an evaluator ID 
is from an evaluator from the
+   * previous application attempt. DriverRestartManager is optional here.
+   */
+  public static boolean isRestartAndIsPreviousEvaluator(final 
Optional<DriverRestartManager> driverRestartManager,
+                                                        final String 
evaluatorId) {
+    if (!driverRestartManager.isPresent()) {
+      return false;
+    }
+
+    return isRestartAndIsPreviousEvaluator(driverRestartManager.get(), 
evaluatorId);
+  }
+
+  /**
+   * Helper function for driver restart to determine whether an evaluator ID 
is from an evaluator from the
+   * previous application attempt.
+   */
+  public static boolean isRestartAndIsPreviousEvaluator(final 
DriverRestartManager driverRestartManager,
+                                                        final String 
evaluatorId) {
+    return driverRestartManager.isRestart() && 
driverRestartManager.getPreviousEvaluatorIds().contains(evaluatorId);
+  }
+
+  private DriverRestartUtilities() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index ba72ad9..a8d538e 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -23,6 +23,8 @@ import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.driver.restart.DriverRestartUtilities;
 import org.apache.reef.proto.ReefServiceProtos;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
 import org.apache.reef.util.Optional;
@@ -43,6 +45,7 @@ public final class ContextRepresenters {
 
   private final EvaluatorMessageDispatcher messageDispatcher;
   private final ContextFactory contextFactory;
+  private final Optional<DriverRestartManager> driverRestartManager;
 
   // Mutable fields
   @GuardedBy("this")
@@ -53,8 +56,22 @@ public final class ContextRepresenters {
   @Inject
   private ContextRepresenters(final EvaluatorMessageDispatcher 
messageDispatcher,
                               final ContextFactory contextFactory) {
+    this(messageDispatcher, contextFactory, 
Optional.<DriverRestartManager>empty());
+  }
+
+  @Inject
+  private ContextRepresenters(final EvaluatorMessageDispatcher 
messageDispatcher,
+                              final ContextFactory contextFactory,
+                              final DriverRestartManager driverRestartManager) 
{
+    this(messageDispatcher, contextFactory, Optional.of(driverRestartManager));
+  }
+
+  private ContextRepresenters(final EvaluatorMessageDispatcher 
messageDispatcher,
+                              final ContextFactory contextFactory,
+                              final Optional<DriverRestartManager> 
driverRestartManager) {
     this.messageDispatcher = messageDispatcher;
     this.contextFactory = contextFactory;
+    this.driverRestartManager = driverRestartManager;
   }
 
   /**
@@ -91,7 +108,7 @@ public final class ContextRepresenters {
   /**
    * Process heartbeats from the contexts on an Evaluator.
    *
-   * @param contextStatusProto
+   * @param contextStatusProtos
    * @param notifyClientOnNewActiveContext
    */
   public synchronized void onContextStatusMessages(final 
Iterable<ReefServiceProtos.ContextStatusProto>
@@ -210,7 +227,7 @@ public final class ContextRepresenters {
         Optional.of(contextStatusProto.getParentId()) : 
Optional.<String>empty();
     final EvaluatorContext context = contextFactory.newContext(contextID, 
parentID);
     this.addContext(context);
-    if (contextStatusProto.getRecovery()) {
+    if 
(DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, 
context.getEvaluatorId())) {
       // when we get a recovered active context, always notify application
       this.messageDispatcher.onDriverRestartContextActive(context);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index d546e59..032c0cb 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -22,7 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.evaluator.CLRProcessFactory;
 import org.apache.reef.driver.restart.DriverRestartManager;
-import org.apache.reef.exception.DriverFatalRuntimeException;
+import org.apache.reef.driver.restart.DriverRestartUtilities;
 import org.apache.reef.tang.ConfigurationProvider;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.context.FailedContext;
@@ -392,27 +392,21 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       final String evaluatorRID = 
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
 
       // first message from a running evaluator trying to re-establish 
communications
-      if (evaluatorHeartbeatProto.getRecovery()) {
-        if(this.driverRestartManager.isPresent()) {
-          this.evaluatorControlHandler.setRemoteID(evaluatorRID);
-          this.stateManager.setRunning();
-
-          boolean restartCompleted = 
this.driverRestartManager.get().evaluatorRecovered(this.evaluatorId);
-
-          LOG.log(Level.FINE, "Received recovery heartbeat from evaluator 
{0}.", this.evaluatorId);
-
-          if (restartCompleted) {
-            this.messageDispatcher.onDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
-            LOG.log(Level.INFO, "All expected evaluators checked in.");
-          } else {
-            LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.",
-                new 
Object[]{this.driverRestartManager.get().getPreviousEvaluatorIds(),
-                    
this.driverRestartManager.get().getRecoveredEvaluatorIds()});
-          }
+      if 
(DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, 
evaluatorId)) {
+        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+        this.stateManager.setRunning();
+
+        boolean restartCompleted = 
this.driverRestartManager.get().evaluatorRecovered(this.evaluatorId);
+
+        LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", 
this.evaluatorId);
+
+        if (restartCompleted) {
+          this.messageDispatcher.onDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
+          LOG.log(Level.INFO, "All expected evaluators checked in.");
         } else {
-          final String errorMsg = "Restart configurations are not set 
properly. The DriverRestartManager is missing.";
-          LOG.log(Level.SEVERE, errorMsg);
-          throw new DriverFatalRuntimeException(errorMsg);
+          LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.",
+              new 
Object[]{this.driverRestartManager.get().getPreviousEvaluatorIds(),
+                  this.driverRestartManager.get().getRecoveredEvaluatorIds()});
         }
       }
 
@@ -547,8 +541,8 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       if (taskStatusProto.getState() == ReefServiceProtos.State.INIT ||
           taskStatusProto.getState() == ReefServiceProtos.State.FAILED ||
           taskStatusProto.getState() == ReefServiceProtos.State.RUNNING ||
-          taskStatusProto.getRecovery() // for task from recovered evaluators
-          ) {
+          // for task from recovered evaluators
+          
DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, 
evaluatorId)) {
 
         // [REEF-308] exposes a bug where the .NET evaluator does not send its 
states in the right order
         // [REEF-289] is a related item which may fix the issue
@@ -565,7 +559,8 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
                 
this.contextRepresenters.getContext(taskStatusProto.getContextId()),
                 this.messageDispatcher,
                 this,
-                this.exceptionCodec));
+                this.exceptionCodec,
+                this.driverRestartManager));
       } else {
         throw new RuntimeException("Received a message of state " + 
taskStatusProto.getState() +
             ", not INIT, RUNNING, or FAILED for Task " + 
taskStatusProto.getTaskId() +

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 6bfee45..75dd24e 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -21,6 +21,8 @@ package org.apache.reef.runtime.common.driver.task;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.driver.restart.DriverRestartUtilities;
 import org.apache.reef.driver.task.FailedTask;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.proto.ReefServiceProtos;
@@ -47,6 +49,7 @@ public final class TaskRepresenter {
   private final EvaluatorManager evaluatorManager;
   private final ExceptionCodec exceptionCodec;
   private final String taskId;
+  private final Optional<DriverRestartManager> driverRestartManager;
 
   // Mutable state
   private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
@@ -55,12 +58,14 @@ public final class TaskRepresenter {
                          final EvaluatorContext context,
                          final EvaluatorMessageDispatcher messageDispatcher,
                          final EvaluatorManager evaluatorManager,
-                         final ExceptionCodec exceptionCodec) {
+                         final ExceptionCodec exceptionCodec,
+                         final Optional<DriverRestartManager> 
driverRestartManager) {
     this.taskId = taskId;
     this.context = context;
     this.messageDispatcher = messageDispatcher;
     this.evaluatorManager = evaluatorManager;
     this.exceptionCodec = exceptionCodec;
+    this.driverRestartManager = driverRestartManager;
   }
 
   private static byte[] getResult(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
@@ -134,7 +139,7 @@ public final class TaskRepresenter {
     }
 
     // fire driver restart task running handler if this is a recovery heartbeat
-    if (taskStatusProto.getRecovery()) {
+    if 
(DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, 
evaluatorManager.getId())) {
       final RunningTask runningTask = new RunningTaskImpl(
           this.evaluatorManager, this.taskId, this.context, this);
       this.messageDispatcher.onDriverRestartTaskRunning(runningTask);

Reply via email to