Repository: incubator-reef
Updated Branches:
refs/heads/master 35674a3df -> 1dc1cf3a5
[REEF-559] Tighten previous evaluator ID checks by using entire set of
evaluator IDs
This addressed the issue by
* Keeping the set of evaluator IDs that are expected to report back to
the driver on restart as well as the set of evaluator IDs that have
reported back.
* Use the set mentioned to determine whether restart is completed or
not.
JIRA:
[REEF-559](https://issues.apache.org/jira/browse/REEF-559)
Pull Request:
This closes #351
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/1dc1cf3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/1dc1cf3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/1dc1cf3a
Branch: refs/heads/master
Commit: 1dc1cf3a55335a7948b2602ab5eb2b3bce947726
Parents: 35674a3
Author: Andrew Chung <[email protected]>
Authored: Fri Aug 7 11:33:33 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 7 18:02:22 2015 -0700
----------------------------------------------------------------------
.../driver/restart/DriverRestartManager.java | 25 ++++----
.../restart/DriverRestartManagerImpl.java | 64 ++++++++++++--------
.../driver/evaluator/EvaluatorManager.java | 21 +++----
3 files changed, 59 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1dc1cf3a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 99bd7b0..57c31c8 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -22,6 +22,8 @@ import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
+import java.util.Set;
+
/**
* The manager that handles aspects of driver restart such as determining
whether the driver is in
* restart mode, what to do on restart, whether restart is completed, and
others.
@@ -44,35 +46,34 @@ public interface DriverRestartManager {
void onRestart();
/**
- * Indicate that the Driver restart is complete. It is meant to be called
exactly once during a restart and never
- * during the ininital launch of a Driver.
+ * @return whether restart is completed.
*/
- void setRestartCompleted();
+ boolean isRestartCompleted();
/**
- * @return the number of Evaluators expected to check in from a previous run.
+ * @return the Evaluators expected to check in from a previous run.
*/
- int getNumPreviousContainers();
-
+ Set<String> getPreviousEvaluatorIds();
/**
- * Set the number of containers to expect still active from a previous
execution of the Driver in a restart situation.
+ * Set the Evaluators to expect still active from a previous execution of
the Driver in a restart situation.
* To be called exactly once during a driver restart.
*
- * @param num
+ * @param ids the evaluator IDs of the evaluators that are expected to have
survived driver restart.
*/
- void setNumPreviousContainers(final int num);
+ void setPreviousEvaluatorIds(final Set<String> ids);
/**
- * @return the number of Evaluators from a previous Driver that have checked
in with the Driver
+ * @return the IDs of the Evaluators from a previous Driver that have
checked in with the Driver
* in a restart situation.
*/
- int getNumRecoveredContainers();
+ Set<String> getRecoveredEvaluatorIds();
/**
* Indicate that this Driver has re-established the connection with one more
Evaluator of a previous run.
+ * @return true if the driver restart is completed.
*/
- void oneContainerRecovered();
+ boolean evaluatorRecovered(final String id);
/**
* Records the evaluators when it is allocated. The implementation depends
on the runtime.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1dc1cf3a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
index 0d7a75a..2b07027 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
@@ -21,8 +21,12 @@ package org.apache.reef.driver.restart;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.exception.DriverFatalRuntimeException;
import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,17 +40,19 @@ import java.util.logging.Logger;
public final class DriverRestartManagerImpl implements DriverRestartManager {
private static final Logger LOG =
Logger.getLogger(DriverRestartManagerImpl.class.getName());
private final DriverRuntimeRestartManager driverRuntimeRestartManager;
+ private final Set<String> previousEvaluators;
+ private final Set<String> recoveredEvaluators;
+ private boolean restartBegan;
private boolean restartCompleted;
- private int numPreviousContainers;
- private int numRecoveredContainers;
@Inject
private DriverRestartManagerImpl(final DriverRuntimeRestartManager
driverRuntimeRestartManager) {
this.driverRuntimeRestartManager = driverRuntimeRestartManager;
this.restartCompleted = false;
- this.numPreviousContainers = -1;
- this.numRecoveredContainers = 0;
+ this.restartBegan = false;
+ this.previousEvaluators = new HashSet<>();
+ this.recoveredEvaluators = new HashSet<>();
}
@Override
@@ -57,46 +63,54 @@ public final class DriverRestartManagerImpl implements
DriverRestartManager {
@Override
public void onRestart() {
final EvaluatorRestartInfo evaluatorRestartInfo =
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
- setNumPreviousContainers(evaluatorRestartInfo.getAliveEvaluators().size());
+ setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
}
@Override
- public synchronized void setRestartCompleted() {
- if (this.restartCompleted) {
- LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
- } else {
- this.restartCompleted = true;
- }
+ public boolean isRestartCompleted() {
+ return this.restartCompleted;
}
@Override
- public synchronized int getNumPreviousContainers() {
- return this.numPreviousContainers;
+ public synchronized Set<String> getPreviousEvaluatorIds() {
+ return Collections.unmodifiableSet(this.previousEvaluators);
}
@Override
- public synchronized void setNumPreviousContainers(final int num) {
- if (this.numPreviousContainers >= 0) {
- throw new IllegalStateException("Attempting to set the number of
expected containers left " +
- "from a previous container more than once.");
+ public synchronized void setPreviousEvaluatorIds(final Set<String> ids) {
+ if (!this.restartBegan) {
+ previousEvaluators.addAll(ids);
} else {
- this.numPreviousContainers = num;
+ final String errMsg = "Should not be setting the set of expected alive
evaluators more than once.";
+ LOG.log(Level.SEVERE, errMsg);
+ throw new DriverFatalRuntimeException(errMsg);
}
}
@Override
- public synchronized int getNumRecoveredContainers() {
- return this.numRecoveredContainers;
+ public synchronized Set<String> getRecoveredEvaluatorIds() {
+ return Collections.unmodifiableSet(this.previousEvaluators);
}
@Override
- public synchronized void oneContainerRecovered() {
- this.numRecoveredContainers += 1;
- if (this.numRecoveredContainers > this.numPreviousContainers) {
- throw new IllegalStateException("Reconnected to" +
- this.numRecoveredContainers + "Evaluators while only expecting " +
this.numPreviousContainers);
+ public synchronized boolean evaluatorRecovered(final String evaluatorId) {
+ if (!this.previousEvaluators.contains(evaluatorId)) {
+ final String errMsg = "Evaluator with evaluator ID " + evaluatorId + "
not expected to be alive.";
+ LOG.log(Level.SEVERE, errMsg);
+ throw new DriverFatalRuntimeException(errMsg);
+ }
+
+ if (!this.recoveredEvaluators.add(evaluatorId)) {
+ LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + "
added to the set" +
+ " of recovered evaluators more than once. Ignoring second add...");
}
+
+ if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) {
+ this.restartCompleted = true;
+ }
+
+ return this.restartCompleted;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1dc1cf3a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index e73cfef..d546e59 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -397,24 +397,17 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
this.evaluatorControlHandler.setRemoteID(evaluatorRID);
this.stateManager.setRunning();
- this.driverRestartManager.get().oneContainerRecovered();
- final int numRecoveredContainers =
this.driverRestartManager.get().getNumRecoveredContainers();
+ boolean restartCompleted =
this.driverRestartManager.get().evaluatorRecovered(this.evaluatorId);
LOG.log(Level.FINE, "Received recovery heartbeat from evaluator
{0}.", this.evaluatorId);
- final int expectedEvaluatorsNumber =
this.driverRestartManager.get().getNumPreviousContainers();
-
- if (numRecoveredContainers > expectedEvaluatorsNumber) {
- LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators,
but [{1}] evaluators have checked in.",
- new Object[]{expectedEvaluatorsNumber,
numRecoveredContainers});
- throw new RuntimeException("More then expected number of
evaluators are checking in during recovery.");
- } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
- LOG.log(Level.INFO, "All [{0}] expected evaluators have checked
in. Recovery completed.",
- expectedEvaluatorsNumber);
- this.driverRestartManager.get().setRestartCompleted();
+
+ if (restartCompleted) {
this.messageDispatcher.onDriverRestartCompleted(new
DriverRestartCompleted(System.currentTimeMillis()));
+ LOG.log(Level.INFO, "All expected evaluators checked in.");
} else {
- LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}]
evaluators have checked in.",
- new Object[]{expectedEvaluatorsNumber,
numRecoveredContainers});
+ LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.",
+ new
Object[]{this.driverRestartManager.get().getPreviousEvaluatorIds(),
+
this.driverRestartManager.get().getRecoveredEvaluatorIds()});
}
} else {
final String errorMsg = "Restart configurations are not set
properly. The DriverRestartManager is missing.";