Repository: incubator-reef
Updated Branches:
refs/heads/master 63e0c074c -> d09250e2e
[REEF-631] Implement driver restart completion logic
This addressed the issue by
* Change restart completion logic to be in
`DriverRestartManager.onRecoverEvaluator` instead of being in
`EvaluatorManager`.
* Implement the actual restart completion logic to call
`EvaluatorFailedHandler` and to mark Evaluators as expired.
JIRA:
[REEF-631](https://issues.apache.org/jira/browse/REEF-631)
Pull Request:
This closes #415
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/d09250e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/d09250e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/d09250e2
Branch: refs/heads/master
Commit: d09250e2e4dcd607b562f8f06cfdb50c35010c05
Parents: 63e0c07
Author: Andrew Chung <[email protected]>
Authored: Tue Aug 25 22:26:19 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 26 14:56:29 2015 -0700
----------------------------------------------------------------------
.../driver/restart/DriverRestartManager.java | 75 +++++++++++++++++---
.../evaluator/EvaluatorHeartbeatHandler.java | 11 ++-
.../driver/evaluator/EvaluatorManager.java | 14 +---
3 files changed, 76 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d09250e2/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 6e422a1..19f2b64 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -21,7 +21,12 @@ package org.apache.reef.driver.restart;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
+import org.apache.reef.driver.parameters.ServiceDriverRestartCompletedHandlers;
import org.apache.reef.exception.DriverFatalRuntimeException;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.util.*;
@@ -38,13 +43,21 @@ import java.util.logging.Logger;
public final class DriverRestartManager {
private static final Logger LOG =
Logger.getLogger(DriverRestartManager.class.getName());
private final DriverRuntimeRestartManager driverRuntimeRestartManager;
+ private final Set<EventHandler<DriverRestartCompleted>>
driverRestartCompletedHandlers;
+ private final Set<EventHandler<DriverRestartCompleted>>
serviceDriverRestartCompletedHandlers;
private RestartEvaluators restartEvaluators;
private DriverRestartState state = DriverRestartState.NOT_RESTARTED;
@Inject
- private DriverRestartManager(final DriverRuntimeRestartManager
driverRuntimeRestartManager) {
+ private DriverRestartManager(final DriverRuntimeRestartManager
driverRuntimeRestartManager,
+ @Parameter(DriverRestartCompletedHandlers.class)
+ final Set<EventHandler<DriverRestartCompleted>>
driverRestartCompletedHandlers,
+
@Parameter(ServiceDriverRestartCompletedHandlers.class)
+ final Set<EventHandler<DriverRestartCompleted>>
serviceDriverRestartCompletedHandlers) {
this.driverRuntimeRestartManager = driverRuntimeRestartManager;
+ this.driverRestartCompletedHandlers = driverRestartCompletedHandlers;
+ this.serviceDriverRestartCompletedHandlers =
serviceDriverRestartCompletedHandlers;
}
/**
@@ -93,7 +106,7 @@ public final class DriverRestartManager {
driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators());
- // TODO[REEF-560]: Call onDriverRestartCompleted() (to do in REEF-617) on
a Timer.
+ // TODO[REEF-560]: Call onDriverRestartCompleted() on a Timer.
}
/**
@@ -110,11 +123,10 @@ public final class DriverRestartManager {
/**
* Indicate that this Driver has re-established the connection with one more
Evaluator of a previous run.
- * Calls the restart complete action if the latest evaluator is the last
evaluator to recover.
- * @return true if the driver restart is completed.
+ * @return true if the evaluator has been newly recovered.
*/
- public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String
evaluatorId) {
- if (getStateOfPreviousEvaluator(evaluatorId) ==
EvaluatorRestartState.NOT_EXPECTED) {
+ public synchronized boolean onRecoverEvaluator(final String evaluatorId) {
+ if (getStateOfPreviousEvaluator(evaluatorId).isFailedOrNotExpected()) {
final String errMsg = "Evaluator with evaluator ID " + evaluatorId + "
not expected to be alive.";
LOG.log(Level.SEVERE, errMsg);
throw new DriverFatalRuntimeException(errMsg);
@@ -123,11 +135,17 @@ public final class DriverRestartManager {
if (getStateOfPreviousEvaluator(evaluatorId) !=
EvaluatorRestartState.EXPECTED) {
LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + "
added to the set" +
" of recovered evaluators more than once. Ignoring second add...");
- } else {
- setEvaluatorReported(evaluatorId);
+ return false;
+ }
+
+ // set the status for this evaluator ID to be reported.
+ setEvaluatorReported(evaluatorId);
+
+ if (haveAllExpectedEvaluatorsReported()) {
+ onDriverRestartCompleted();
}
- return haveAllExpectedEvaluatorsReported();
+ return true;
}
/**
@@ -212,6 +230,45 @@ public final class DriverRestartManager {
return true;
}
+ /**
+ * Sets the driver restart status to be completed if not yet set and
notifies the restart completed event handlers.
+ */
+ private synchronized void onDriverRestartCompleted() {
+ if (this.state != DriverRestartState.COMPLETED) {
+ final Set<String> outstandingEvaluatorIds =
getOutstandingEvaluatorsAndMarkExpired();
+
driverRuntimeRestartManager.informAboutEvaluatorFailures(outstandingEvaluatorIds);
+
+ this.state = DriverRestartState.COMPLETED;
+ final DriverRestartCompleted driverRestartCompleted = new
DriverRestartCompleted(System.currentTimeMillis());
+
+ for (final EventHandler<DriverRestartCompleted>
serviceRestartCompletedHandler
+ : this.serviceDriverRestartCompletedHandlers) {
+ serviceRestartCompletedHandler.onNext(driverRestartCompleted);
+ }
+
+ for (final EventHandler<DriverRestartCompleted> restartCompletedHandler
: this.driverRestartCompletedHandlers) {
+ restartCompletedHandler.onNext(driverRestartCompleted);
+ }
+
+ LOG.log(Level.FINE, "Restart completed. Evaluators that have not
reported back are: " + outstandingEvaluatorIds);
+ }
+ }
+
+ /**
+ * Gets the outstanding evaluators that have not yet reported back and mark
them as expired.
+ */
+ private Set<String> getOutstandingEvaluatorsAndMarkExpired() {
+ final Set<String> outstanding = new HashSet<>();
+ for (final String previousEvaluatorId :
restartEvaluators.getEvaluatorIds()) {
+ if (getStateOfPreviousEvaluator(previousEvaluatorId) ==
EvaluatorRestartState.EXPECTED) {
+ outstanding.add(previousEvaluatorId);
+ setEvaluatorExpired(previousEvaluatorId);
+ }
+ }
+
+ return outstanding;
+ }
+
private Set<String> getFailedEvaluators() {
final Set<String> failed = new HashSet<>();
for (final String previousEvaluatorId :
this.restartEvaluators.getEvaluatorIds()) {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d09250e2/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
index 3d02ead..ee1af9f 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -73,13 +73,20 @@ public final class EvaluatorHeartbeatHandler
if (driverRestartManager.isRestarting() &&
driverRestartManager.getEvaluatorRestartState(evaluatorId) ==
EvaluatorRestartState.EXPECTED) {
- // TODO[REEF-617]: Create EvaluatorManager for recovered evaluator and
call onEvaluatorHeartbeatMessage().
+
+ if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) {
+ LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported
back to the driver after restart.");
+ // TODO[REEF-617]: Create EvaluatorManager, add to this.evaluators,
and call onEvaluatorHeartbeatMessage().
+
+ } else {
+ LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already
been recovered.");
+ }
return;
}
if (driverRestartManager.getEvaluatorRestartState(evaluatorId) ==
EvaluatorRestartState.EXPIRED) {
LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has
reported back to the driver after restart.");
- // TODO[REEF-617]: Create EvaluatorManager for expired evaluator and
close it.
+ // TODO[REEF-617]: Create EvaluatorManager, call
onEvaluatorHeartbeatMessage, and close it.
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d09250e2/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 116a71d..bc8ebbf 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -34,7 +34,6 @@ import
org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
import org.apache.reef.io.naming.Identifiable;
import org.apache.reef.proto.EvaluatorRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.runtime.common.DriverRestartCompleted;
import org.apache.reef.driver.evaluator.EvaluatorProcess;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
@@ -355,18 +354,7 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
-
- // TODO[REEF-617]: Move evaluator recovery to
EvaluatorHeartbeatHandler and reregister evaluator here.
- final boolean restartCompleted =
-
this.driverRestartManager.onRecoverEvaluatorIsRestartComplete(this.evaluatorId);
-
- LOG.log(Level.FINE, "Received recovery heartbeat from evaluator
{0}.", this.evaluatorId);
-
- // TODO[REEF-617]: Move restart completion logic to
DriverRestartManager.
- if (restartCompleted) {
- this.messageDispatcher.onDriverRestartCompleted(new
DriverRestartCompleted(System.currentTimeMillis()));
- LOG.log(Level.INFO, "All expected evaluators checked in.");
- }
+ driverRestartManager.setEvaluatorReregistered(evaluatorId);
}
}