Repository: incubator-reef
Updated Branches:
refs/heads/master 9b971979f -> ff336f33f
[REEF-617] Enable creation of EvaluatorManager on restarted evaluators
This addressed the issue by
* Adding helper functions for ``EvaluatorManagerFactory`` to create
EvaluatorManagers for recovered evaluators.
* Recover ``EvaluatorManager` on first heartbeat of an expected
Evaluator on restart.
* Fix a bug in ``DriverRestartManager`` where the boolean check is
reversed.
JIRA:
[REEF-617](https://issues.apache.org/jira/browse/REEF-617)
Pull Request:
This closes #422
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ff336f33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ff336f33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ff336f33
Branch: refs/heads/master
Commit: ff336f33f75d783deb34fad1512928c67131b7ed
Parents: 9b97197
Author: Andrew Chung <[email protected]>
Authored: Wed Aug 26 15:32:52 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Aug 27 11:21:00 2015 -0700
----------------------------------------------------------------------
.../driver/restart/DriverRestartManager.java | 20 ++++-
.../driver/context/ContextRepresenters.java | 16 +---
.../evaluator/EvaluatorHeartbeatHandler.java | 25 +++++-
.../evaluator/EvaluatorManagerFactory.java | 81 +++++++++++++-------
.../common/driver/evaluator/Evaluators.java | 2 +-
.../resourcemanager/ResourceStatusHandler.java | 5 +-
.../common/driver/task/TaskRepresenter.java | 3 +-
.../driver/YarnDriverRuntimeRestartManager.java | 1 +
8 files changed, 104 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 19f2b64..52764e4 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -27,6 +27,7 @@ import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.runtime.common.DriverRestartCompleted;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
+import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
import javax.inject.Inject;
import java.util.*;
@@ -122,6 +123,19 @@ public final class DriverRestartManager {
}
/**
+ * @return The ResourceRecoverEvent of the specified evaluator. Throws a
{@link DriverFatalRuntimeException} if
+ * the evaluator does not exist in the set of known evaluators.
+ */
+ public synchronized ResourceRecoverEvent getResourceRecoverEvent(final
String evaluatorId) {
+ if (!this.restartEvaluators.contains(evaluatorId)) {
+ throw new DriverFatalRuntimeException("Unexpected evaluator [" +
evaluatorId + "], should " +
+ "not have been recorded.");
+ }
+
+ return this.restartEvaluators.get(evaluatorId).getResourceRecoverEvent();
+ }
+
+ /**
* Indicate that this Driver has re-established the connection with one more
Evaluator of a previous run.
* @return true if the evaluator has been newly recovered.
*/
@@ -179,9 +193,9 @@ public final class DriverRestartManager {
}
/**
- * Signals to the {@link DriverRestartManager} that an evaluator has had its
running task processed.
+ * Signals to the {@link DriverRestartManager} that an evaluator has had its
running task or active context processed.
*/
- public synchronized void setEvaluatorRunningTask(final String evaluatorId) {
+ public synchronized void setEvaluatorProcessed(final String evaluatorId) {
setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED);
}
@@ -193,7 +207,7 @@ public final class DriverRestartManager {
}
private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final
String evaluatorId) {
- if (this.restartEvaluators.contains(evaluatorId)) {
+ if (!this.restartEvaluators.contains(evaluatorId)) {
return EvaluatorRestartState.NOT_EXPECTED;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index 8abc19d..4936b9a 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -23,8 +23,6 @@ import net.jcip.annotations.ThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.restart.DriverRestartManager;
-import org.apache.reef.driver.restart.EvaluatorRestartState;
import org.apache.reef.proto.ReefServiceProtos;
import
org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
import org.apache.reef.util.Optional;
@@ -45,7 +43,6 @@ public final class ContextRepresenters {
private final EvaluatorMessageDispatcher messageDispatcher;
private final ContextFactory contextFactory;
- private final DriverRestartManager driverRestartManager;
// Mutable fields
@GuardedBy("this")
@@ -55,11 +52,9 @@ public final class ContextRepresenters {
@Inject
private ContextRepresenters(final EvaluatorMessageDispatcher
messageDispatcher,
- final ContextFactory contextFactory,
- final DriverRestartManager driverRestartManager)
{
+ final ContextFactory contextFactory) {
this.messageDispatcher = messageDispatcher;
this.contextFactory = contextFactory;
- this.driverRestartManager = driverRestartManager;
}
/**
@@ -215,13 +210,8 @@ public final class ContextRepresenters {
Optional.of(contextStatusProto.getParentId()) :
Optional.<String>empty();
final EvaluatorContext context = contextFactory.newContext(contextID,
parentID);
this.addContext(context);
- if
(driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId()) ==
EvaluatorRestartState.REREGISTERED) {
- // when we get a recovered active context, always notify application
- this.messageDispatcher.onDriverRestartContextActive(context);
- } else {
- if (notifyClientOnNewActiveContext) {
- this.messageDispatcher.onContextActive(context);
- }
+ if (notifyClientOnNewActiveContext) {
+ this.messageDispatcher.onContextActive(context);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
index ee1af9f..9011d7f 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -76,8 +76,8 @@ public final class EvaluatorHeartbeatHandler
if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) {
LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported
back to the driver after restart.");
- // TODO[REEF-617]: Create EvaluatorManager, add to this.evaluators,
and call onEvaluatorHeartbeatMessage().
+ evaluators.put(recoverEvaluatorManager(evaluatorId,
evaluatorHeartbeatMessage));
} else {
LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already
been recovered.");
}
@@ -86,7 +86,10 @@ public final class EvaluatorHeartbeatHandler
if (driverRestartManager.getEvaluatorRestartState(evaluatorId) ==
EvaluatorRestartState.EXPIRED) {
LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has
reported back to the driver after restart.");
- // TODO[REEF-617]: Create EvaluatorManager, call
onEvaluatorHeartbeatMessage, and close it.
+
+ // Create the evaluator manager, analyze its heartbeat, but don't add
it to the set of Evaluators.
+ // Immediately close it.
+ recoverEvaluatorManager(evaluatorId,
evaluatorHeartbeatMessage).close();
return;
}
@@ -102,4 +105,22 @@ public final class EvaluatorHeartbeatHandler
LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
}
}
+
+ /**
+ * Creates an EvaluatorManager for recovered evaluator.
+ * {@link EvaluatorManager#onEvaluatorHeartbeatMessage(RemoteMessage)}
should not
+ * do anything if driver restart period has expired. Expired evaluators
should be immediately closed
+ * upon return of this function, while evaluators that have not yet expired
should be recorded and added
+ * to the {@link Evaluators} object.
+ */
+ private EvaluatorManager recoverEvaluatorManager(
+ final String evaluatorId,
+ final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto>
evaluatorHeartbeatMessage) {
+ final EvaluatorManager recoveredEvaluatorManager = evaluatorManagerFactory
+ .getNewEvaluatorManagerForRecoveredEvaluator(
+ driverRestartManager.getResourceRecoverEvent(evaluatorId));
+
+
recoveredEvaluatorManager.onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
+ return recoveredEvaluatorManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
index fb496f5..4f49fe9 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
@@ -25,10 +25,7 @@ import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.driver.catalog.ResourceCatalog;
import org.apache.reef.driver.evaluator.EvaluatorProcessFactory;
import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
-import
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
-import
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
-import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
-import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.*;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
@@ -58,6 +55,34 @@ public final class EvaluatorManagerFactory {
this.processFactory = processFactory;
}
+ private EvaluatorManager getNewEvaluatorManagerInstanceForResource(
+ final ResourceEvent resourceEvent) {
+ NodeDescriptor nodeDescriptor =
this.resourceCatalog.getNode(resourceEvent.getNodeId());
+
+ if (nodeDescriptor == null) {
+ final String nodeId = resourceEvent.getNodeId();
+ LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it",
nodeId);
+ final String[] hostNameAndPort = nodeId.split(":");
+ Validate.isTrue(hostNameAndPort.length == 2);
+ final NodeDescriptorEvent nodeDescriptorEvent =
NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId)
+
.setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1]))
+ .setMemorySize(resourceEvent.getResourceMemory())
+ .setRackName(resourceEvent.getRackName().get()).build();
+ // downcasting not to change the API
+ ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent);
+ nodeDescriptor = this.resourceCatalog.getNode(nodeId);
+ }
+ final EvaluatorDescriptorImpl evaluatorDescriptor = new
EvaluatorDescriptorImpl(nodeDescriptor,
+ resourceEvent.getResourceMemory(),
resourceEvent.getVirtualCores().get(),
+ processFactory.newEvaluatorProcess());
+
+ LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]",
resourceEvent.getIdentifier());
+ final EvaluatorManager evaluatorManager =
+ getNewEvaluatorManagerInstance(resourceEvent.getIdentifier(),
evaluatorDescriptor);
+
+ return evaluatorManager;
+ }
+
/**
* Helper method to create a new EvaluatorManager instance.
*
@@ -92,30 +117,9 @@ public final class EvaluatorManagerFactory {
* @param resourceAllocationEvent
* @return an EvaluatorManager for the newly allocated Evaluator.
*/
- public EvaluatorManager getNewEvaluatorManagerForNewlyAllocatedEvaluator(
+ public EvaluatorManager getNewEvaluatorManagerForNewEvaluator(
final ResourceAllocationEvent resourceAllocationEvent) {
- NodeDescriptor nodeDescriptor =
this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId());
-
- if (nodeDescriptor == null) {
- final String nodeId = resourceAllocationEvent.getNodeId();
- LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it",
nodeId);
- final String[] hostNameAndPort = nodeId.split(":");
- Validate.isTrue(hostNameAndPort.length == 2);
- final NodeDescriptorEvent nodeDescriptorEvent =
NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId)
-
.setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1]))
- .setMemorySize(resourceAllocationEvent.getResourceMemory())
- .setRackName(resourceAllocationEvent.getRackName().get()).build();
- // downcasting not to change the API
- ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent);
- nodeDescriptor = this.resourceCatalog.getNode(nodeId);
- }
- final EvaluatorDescriptorImpl evaluatorDescriptor = new
EvaluatorDescriptorImpl(nodeDescriptor,
- resourceAllocationEvent.getResourceMemory(),
resourceAllocationEvent.getVirtualCores().get(),
- processFactory.newEvaluatorProcess());
-
- LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]",
resourceAllocationEvent.getIdentifier());
- final EvaluatorManager evaluatorManager =
-
getNewEvaluatorManagerInstance(resourceAllocationEvent.getIdentifier(),
evaluatorDescriptor);
+ final EvaluatorManager evaluatorManager =
getNewEvaluatorManagerInstanceForResource(resourceAllocationEvent);
evaluatorManager.fireEvaluatorAllocatedEvent();
return evaluatorManager;
@@ -134,4 +138,27 @@ public final class EvaluatorManagerFactory {
return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(),
new EvaluatorDescriptorImpl(null, 128, 1,
processFactory.newEvaluatorProcess()));
}
+
+ /**
+ * Instantiates a new EvaluatorManager for a failed evaluator during driver
restart.
+ * Does not fire an EvaluatorAllocatedEvent.
+ * @param resourceStatusEvent
+ * @return an EvaluatorManager for the user to call fail on.
+ */
+ public EvaluatorManager
getNewEvaluatorManagerForEvaluatorFailedDuringDriverRestart(
+ final ResourceStatusEvent resourceStatusEvent) {
+ return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(),
+ new EvaluatorDescriptorImpl(null, 128, 1,
processFactory.newEvaluatorProcess()));
+ }
+
+ /**
+ * Instantiates a new EvaluatorManager based on a resource allocation from a
recovered evaluator.
+ *
+ * @param resourceRecoverEvent
+ * @return an EvaluatorManager for the newly allocated Evaluator.
+ */
+ public EvaluatorManager getNewEvaluatorManagerForRecoveredEvaluator(
+ final ResourceRecoverEvent resourceRecoverEvent) {
+ return getNewEvaluatorManagerInstanceForResource(resourceRecoverEvent);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index fe464d9..dd6e3ff 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -107,7 +107,7 @@ public final class Evaluators implements AutoCloseable {
public synchronized void put(
final EvaluatorManagerFactory evaluatorManagerFactory,
final ResourceAllocationEvent evaluatorMsg) {
-
this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewlyAllocatedEvaluator(evaluatorMsg));
+
this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
index 70a3cce..3b0a75a 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
@@ -57,8 +57,9 @@ public final class ResourceStatusHandler implements
EventHandler<ResourceStatusE
evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent);
} else {
if (resourceStatusEvent.getIsFromPreviousDriver().get()) {
- final EvaluatorManager previousEvaluatorManager =
-
this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusEvent);
+ final EvaluatorManager previousEvaluatorManager =
this.evaluatorManagerFactory
+
.getNewEvaluatorManagerForEvaluatorFailedDuringDriverRestart(resourceStatusEvent);
+
previousEvaluatorManager.onResourceStatusMessage(resourceStatusEvent);
} else {
throw new RuntimeException(
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index a09532b..0107878 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -88,6 +88,7 @@ public final class TaskRepresenter {
throw new RuntimeException("Received a message for task " +
taskStatusProto.getTaskId() +
" in the TaskRepresenter for Task " + this.taskId);
}
+
if
(driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) ==
EvaluatorRestartState.REREGISTERED) {
// when a recovered heartbeat is received, we will take its word for it
LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.",
@@ -142,7 +143,7 @@ public final class TaskRepresenter {
if
(driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) ==
EvaluatorRestartState.REREGISTERED) {
final RunningTask runningTask = new RunningTaskImpl(
this.evaluatorManager, this.taskId, this.context, this);
-
this.driverRestartManager.setEvaluatorRunningTask(evaluatorManager.getId());
+
this.driverRestartManager.setEvaluatorProcessed(evaluatorManager.getId());
this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
index 495a777..c9a1c34 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -243,6 +243,7 @@ public final class YarnDriverRuntimeRestartManager
implements DriverRuntimeResta
.setState(ReefServiceProtos.State.FAILED)
.setExitCode(1)
.setDiagnostics("Container [" + evaluatorId + "] failed during
driver restart process.")
+ .setIsFromPreviousDriver(true)
.build());
}
}