Repository: incubator-reef
Updated Branches:
refs/heads/master 9b16d54cd -> 8efffd910
[REEF-634] Capture Restarted Evaluators in a POJO
This addressed the issue by
* Creating ResourceRecoverEvent and changing EvaluatorRestartInfo to
save the state of (potentially) restarted Evaluator.
* Creating the interface ResourceEvent and have ResourceAllocationEvent
and ResourceRecoverEvent extend it.
* Have DriverRestartManager remember the status and other information
of the evaluators to recover/fail on restart.
JIRA:
[REEF-634](https://issues.apache.org/jira/browse/REEF-634)
Pull Request:
This closes #408
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8efffd91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8efffd91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8efffd91
Branch: refs/heads/master
Commit: 8efffd910e21f390a2ed08d4d9a84ce8ce9c2e51
Parents: 9b16d54
Author: Andrew Chung <[email protected]>
Authored: Fri Aug 21 18:25:24 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Aug 25 17:51:55 2015 -0700
----------------------------------------------------------------------
.../DefaultDriverRuntimeRestartMangerImpl.java | 6 +-
.../driver/restart/DriverRestartManager.java | 108 +++++++-------
.../restart/DriverRuntimeRestartManager.java | 4 +-
.../driver/restart/EvaluatorRestartInfo.java | 67 ++++++---
.../driver/restart/EvaluatorRestartState.java | 20 ++-
.../reef/driver/restart/RestartEvaluators.java | 101 ++++++++++++++
.../ResourceAllocationEvent.java | 33 +----
.../ResourceAllocationEventImpl.java | 129 -----------------
.../driver/resourcemanager/ResourceEvent.java | 57 ++++++++
.../resourcemanager/ResourceEventImpl.java | 139 +++++++++++++++++++
.../resourcemanager/ResourceRecoverEvent.java | 30 ++++
.../runtime/local/driver/ResourceManager.java | 4 +-
.../runtime/mesos/driver/REEFScheduler.java | 4 +-
.../yarn/driver/YarnContainerManager.java | 4 +-
.../driver/YarnDriverRuntimeRestartManager.java | 30 ++--
15 files changed, 480 insertions(+), 256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
index 12934f7..277b872 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
@@ -57,9 +57,9 @@ final class DefaultDriverRuntimeRestartMangerImpl implements
DriverRuntimeRestar
}
@Override
- public EvaluatorRestartInfo getAliveAndFailedEvaluators() {
+ public RestartEvaluators getPreviousEvaluators() {
throw new DriverFatalRuntimeException(
- "Restart is not enabled. getAliveAndFailedEvaluators should not have
been called.");
+ "Restart is not enabled. getPreviousEvaluators should not have been
called.");
}
@Override
@@ -67,4 +67,4 @@ final class DefaultDriverRuntimeRestartMangerImpl implements
DriverRuntimeRestar
throw new DriverFatalRuntimeException(
"Restart is not enabled. informAboutEvaluatorFailures should not have
been called.");
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 3e1be1f..d6b823c 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -38,7 +38,8 @@ import java.util.logging.Logger;
public final class DriverRestartManager {
private static final Logger LOG =
Logger.getLogger(DriverRestartManager.class.getName());
private final DriverRuntimeRestartManager driverRuntimeRestartManager;
- private final Map<String, EvaluatorRestartState> previousEvaluators = new
HashMap<>();
+
+ private RestartEvaluators restartEvaluators;
private DriverRestartState state = DriverRestartState.NotRestarted;
@Inject
@@ -81,9 +82,17 @@ public final class DriverRestartManager {
* as alive to the job driver.
*/
public synchronized void onRestart() {
- final EvaluatorRestartInfo evaluatorRestartInfo =
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
- setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
-
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
+ if (this.state == DriverRestartState.RestartBegan) {
+ restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators();
+ this.state = DriverRestartState.RestartInProgress;
+ } else {
+ final String errMsg = "Should not be setting the set of expected alive
evaluators more than once.";
+ LOG.log(Level.SEVERE, errMsg);
+ throw new DriverFatalRuntimeException(errMsg);
+ }
+
+
driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators());
+
// TODO[REEF-560]: Call onDriverRestartCompleted() (to do in REEF-617) on
a Timer.
}
@@ -92,32 +101,11 @@ public final class DriverRestartManager {
* if the {@link DriverRestartManager} does not believe that it's an
evaluator to be recovered.
*/
public synchronized EvaluatorRestartState getEvaluatorRestartState(final
String evaluatorId) {
- if (this.state.hasNotRestarted() ||
- !this.previousEvaluators.containsKey(evaluatorId)) {
+ if (this.state.hasNotRestarted()) {
return EvaluatorRestartState.NOT_EXPECTED;
}
- return this.previousEvaluators.get(evaluatorId);
- }
-
- /**
- * Set the Evaluators to expect still active from a previous execution of
the Driver in a restart situation.
- * To be called exactly once during a driver restart.
- *
- * @param previousEvaluatorIds the evaluator IDs of the evaluators that are
expected to have survived driver restart.
- */
- private synchronized void setPreviousEvaluatorIds(final Set<String>
previousEvaluatorIds) {
- if (this.state == DriverRestartState.RestartBegan) {
- for (final String previousEvaluatorId : previousEvaluatorIds) {
- setEvaluatorExpected(previousEvaluatorId);
- }
-
- this.state = DriverRestartState.RestartInProgress;
- } else {
- final String errMsg = "Should not be setting the set of expected alive
evaluators more than once.";
- LOG.log(Level.SEVERE, errMsg);
- throw new DriverFatalRuntimeException(errMsg);
- }
+ return getStateOfPreviousEvaluator(evaluatorId);
}
/**
@@ -126,14 +114,13 @@ public final class DriverRestartManager {
* @return true if the driver restart is completed.
*/
public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String
evaluatorId) {
- if (!this.previousEvaluators.containsKey(evaluatorId) ||
- this.previousEvaluators.get(evaluatorId) ==
EvaluatorRestartState.NOT_EXPECTED) {
+ if (getStateOfPreviousEvaluator(evaluatorId) ==
EvaluatorRestartState.NOT_EXPECTED) {
final String errMsg = "Evaluator with evaluator ID " + evaluatorId + "
not expected to be alive.";
LOG.log(Level.SEVERE, errMsg);
throw new DriverFatalRuntimeException(errMsg);
}
- if (this.previousEvaluators.get(evaluatorId) !=
EvaluatorRestartState.EXPECTED) {
+ if (getStateOfPreviousEvaluator(evaluatorId) !=
EvaluatorRestartState.EXPECTED) {
LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + "
added to the set" +
" of recovered evaluators more than once. Ignoring second add...");
} else {
@@ -160,74 +147,79 @@ public final class DriverRestartManager {
}
/**
- * Signals to the {@link DriverRestartManager} that an evaluator is to be
expected to report back after restart.
- */
- public synchronized void setEvaluatorExpected(final String evaluatorId) {
- if (previousEvaluators.containsKey(evaluatorId)) {
- LOG.log(Level.WARNING, "Evaluator " + evaluatorId + " is already added
to the set of previous evaluators with " +
- "state [" + previousEvaluators.get(evaluatorId) + "]. Ignoring...");
- return;
- }
-
- previousEvaluators.put(evaluatorId, EvaluatorRestartState.EXPECTED);
- }
-
- /**
* Signals to the {@link DriverRestartManager} that an evaluator has
reported back after restart.
*/
public synchronized void setEvaluatorReported(final String evaluatorId) {
- setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REPORTED);
+ setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REPORTED);
}
/**
* Signals to the {@link DriverRestartManager} that an evaluator has had its
recovery heartbeat processed.
*/
public synchronized void setEvaluatorReregistered(final String evaluatorId) {
- setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REREGISTERED);
+ setStateOfPreviousEvaluator(evaluatorId,
EvaluatorRestartState.REREGISTERED);
}
/**
* Signals to the {@link DriverRestartManager} that an evaluator has had its
running task processed.
*/
public synchronized void setEvaluatorRunningTask(final String evaluatorId) {
- setPreviousEvaluatorState(
- evaluatorId, EvaluatorRestartState.PROCESSED);
+ setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED);
}
/**
* Signals to the {@link DriverRestartManager} that an expected evaluator
has been expired.
*/
public synchronized void setEvaluatorExpired(final String evaluatorId) {
- setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.EXPIRED);
+ setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.EXPIRED);
}
- private synchronized void setPreviousEvaluatorState(final String evaluatorId,
- final
EvaluatorRestartState to) {
- if (!previousEvaluators.containsKey(evaluatorId) ||
-
!EvaluatorRestartState.isLegalTransition(previousEvaluators.get(evaluatorId),
to)) {
- throw evaluatorTransitionFailed(evaluatorId, to);
+ private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final
String evaluatorId) {
+ if (this.restartEvaluators.contains(evaluatorId)) {
+ return EvaluatorRestartState.NOT_EXPECTED;
}
- previousEvaluators.put(evaluatorId, to);
+ return this.restartEvaluators.get(evaluatorId).getEvaluatorRestartState();
+ }
+
+ private synchronized void setStateOfPreviousEvaluator(final String
evaluatorId,
+ final
EvaluatorRestartState to) {
+ if (!restartEvaluators.contains(evaluatorId) ||
+ !restartEvaluators.get(evaluatorId).setEvaluatorRestartState(to)) {
+ throw evaluatorTransitionFailed(evaluatorId, to);
+ }
}
private synchronized DriverFatalRuntimeException
evaluatorTransitionFailed(final String evaluatorId,
final EvaluatorRestartState to) {
- if (!previousEvaluators.containsKey(evaluatorId)) {
+ if (!restartEvaluators.contains(evaluatorId)) {
return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is
not expected.");
}
return new DriverFatalRuntimeException("Evaluator " + evaluatorId + "
wants to transition to state " +
- "[" + to + "], but is in the illegal state [" +
previousEvaluators.get(evaluatorId) + "].");
+ "[" + to + "], but is in the illegal state [" +
+ restartEvaluators.get(evaluatorId).getEvaluatorRestartState() + "].");
}
private synchronized boolean haveAllExpectedEvaluatorsReported() {
- for (final EvaluatorRestartState evaluatorRestartState :
this.previousEvaluators.values()) {
- if (!evaluatorRestartState.hasReported()) {
+ for (final String previousEvaluatorId :
this.restartEvaluators.getEvaluatorIds()) {
+ final EvaluatorRestartState restartState =
getStateOfPreviousEvaluator(previousEvaluatorId);
+ if (restartState == EvaluatorRestartState.EXPECTED) {
return false;
}
}
return true;
}
+
+ private Set<String> getFailedEvaluators() {
+ final Set<String> failed = new HashSet<>();
+ for (final String previousEvaluatorId :
this.restartEvaluators.getEvaluatorIds()) {
+ if (getStateOfPreviousEvaluator(previousEvaluatorId) ==
EvaluatorRestartState.FAILED) {
+ failed.add(previousEvaluatorId);
+ }
+ }
+
+ return failed;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
index 4e38b4b..5e1acec 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
@@ -58,9 +58,9 @@ public interface DriverRuntimeRestartManager {
/**
* Gets the sets of alive and failed evaluators based on the runtime
implementation.
- * @return EvaluatorRestartInfo, which encapsulates the alive and failed set
of evaluator IDs.
+ * @return A map which encapsulates the states of previous evaluators.
*/
- EvaluatorRestartInfo getAliveAndFailedEvaluators();
+ RestartEvaluators getPreviousEvaluators();
/**
* Informs the necessary components about failed evaluators. The
implementation is runtime dependent.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
index 10beb78..e058c9a 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
@@ -21,36 +21,71 @@ package org.apache.reef.driver.restart;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-
-import java.util.Collections;
-import java.util.Set;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
+import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
/**
- * The encapsulating class for alive and failed evaluators on driver restart.
+ * An object that encapsulates the information needed to construct an
+ * {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager}
for a recovered evaluator
+ * on restart.
*/
@Private
@DriverSide
@Unstable
public final class EvaluatorRestartInfo {
- private final Set<String> aliveEvaluators;
- private final Set<String> failedEvaluators;
+ private final ResourceRecoverEvent resourceRecoverEvent;
+ private EvaluatorRestartState evaluatorRestartState;
+
+ /**
+ * Creates an {@link EvaluatorRestartInfo} object that represents the
information of an evaluator that is expected
+ * to recover.
+ */
+ public static EvaluatorRestartInfo createExpectedEvaluatorInfo(final
ResourceRecoverEvent resourceRecoverEvent) {
+ return new EvaluatorRestartInfo(resourceRecoverEvent,
EvaluatorRestartState.EXPECTED);
+ }
+
+ /**
+ * Creates an {@link EvaluatorRestartInfo} object that represents the
information of an evaluator that
+ * has failed on driver restart.
+ */
+ public static EvaluatorRestartInfo createFailedEvaluatorInfo(final String
evaluatorId) {
+ final ResourceRecoverEvent resourceRecoverEvent =
+
ResourceEventImpl.newRecoveryBuilder().setIdentifier(evaluatorId).build();
- public EvaluatorRestartInfo(final Set<String> aliveEvaluators, final
Set<String> failedEvaluators) {
- this.aliveEvaluators = Collections.unmodifiableSet(aliveEvaluators);
- this.failedEvaluators = Collections.unmodifiableSet(failedEvaluators);
+ return new EvaluatorRestartInfo(resourceRecoverEvent,
EvaluatorRestartState.FAILED);
}
/**
- * @return the set of evaluator IDs for alive evaluators on driver restart.
The returned set is unmodifiable.
+ * @return the {@link ResourceRecoverEvent} that contains the information
(e.g. resource MB, node ID, Evaluator ID...)
+ * needed to reconstruct the {@link
org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager} of the
+ * recovered evaluator on restart.
*/
- public Set<String> getAliveEvaluators() {
- return this.aliveEvaluators;
+ public ResourceRecoverEvent getResourceRecoverEvent() {
+ return resourceRecoverEvent;
}
/**
- * @return the set of evaluator IDs for faiuled evaluators on driver
restart. The returned set is unmodifiable.
+ * @return the current process of the restart.
*/
- public Set<String> getFailedEvaluators() {
- return this.failedEvaluators;
+ public EvaluatorRestartState getEvaluatorRestartState() {
+ return evaluatorRestartState;
+ }
+
+ /**
+ * sets the current process of the restart.
+ */
+ public boolean setEvaluatorRestartState(final EvaluatorRestartState to) {
+ if (EvaluatorRestartState.isLegalTransition(evaluatorRestartState, to)) {
+ this.evaluatorRestartState = to;
+ return true;
+ }
+
+ return false;
+ }
+
+ private EvaluatorRestartInfo(final ResourceRecoverEvent resourceRecoverEvent,
+ final EvaluatorRestartState
evaluatorRestartState) {
+ this.resourceRecoverEvent = resourceRecoverEvent;
+ this.evaluatorRestartState = evaluatorRestartState;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
index 4a0c540..a87052b 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
@@ -57,7 +57,12 @@ public enum EvaluatorRestartState {
/**
* The evaluator has only contacted the driver after the expiration period.
*/
- EXPIRED;
+ EXPIRED,
+
+ /**
+ * The evaluator has failed on driver restart.
+ */
+ FAILED;
/**
* @return true if the transition of {@link EvaluatorRestartState} is legal.
@@ -103,4 +108,17 @@ public enum EvaluatorRestartState {
return false;
}
}
+
+ /**
+ * @return true if the evaluator has failed on driver restart or is not
expected to report back to the driver.
+ */
+ public boolean isFailedOrNotExpected() {
+ switch(this) {
+ case FAILED:
+ case NOT_EXPECTED:
+ return true;
+ default:
+ return false;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java
new file mode 100644
index 0000000..c3f8857
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.BuilderUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Represents holds the set of Evaluator information needed to recover
EvaluatorManagers
+ * on the restarted Driver.
+ */
+@Private
+@DriverSide
+@Unstable
+public final class RestartEvaluators {
+ private final Map<String, EvaluatorRestartInfo> restartEvaluatorsMap;
+
+ private RestartEvaluators(final Map<String, EvaluatorRestartInfo>
restartEvaluatorsMap){
+ this.restartEvaluatorsMap = BuilderUtils.notNull(restartEvaluatorsMap);
+ }
+
+ /**
+ * @return true if Evaluator with evaluatorId can be an Evaluator from
+ * previous application attempts.
+ */
+ boolean contains(final String evaluatorId) {
+ return restartEvaluatorsMap.containsKey(evaluatorId);
+ }
+
+ /**
+ * @return The {@link EvaluatorRestartInfo} of an Evaluator from
+ * previous application attempts.
+ */
+ EvaluatorRestartInfo get(final String evaluatorId) {
+ return restartEvaluatorsMap.get(evaluatorId);
+ }
+
+ /**
+ * @return The set of Evaluator IDs of Evaluators from previous
+ * application attempts.
+ */
+ Set<String> getEvaluatorIds() {
+ return restartEvaluatorsMap.keySet();
+ }
+
+ /**
+ * @return a new Builder to build an instance of {@link RestartEvaluators}.
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static final class Builder implements
org.apache.reef.util.Builder<RestartEvaluators>{
+ private final Map<String, EvaluatorRestartInfo> restartInfoMap = new
HashMap<>();
+
+ private Builder(){
+ }
+
+ public boolean addRestartEvaluator(final EvaluatorRestartInfo
evaluatorRestartInfo) {
+ if (evaluatorRestartInfo == null) {
+ return false;
+ }
+
+ final String evaluatorId =
evaluatorRestartInfo.getResourceRecoverEvent().getIdentifier();
+ if (evaluatorId == null || restartInfoMap.containsKey(evaluatorId)) {
+ return false;
+ }
+
+ restartInfoMap.put(evaluatorId, evaluatorRestartInfo);
+ return true;
+ }
+
+ @Override
+ public RestartEvaluators build() {
+ return new
RestartEvaluators(Collections.unmodifiableMap(restartInfoMap));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
index e507d90..5b0156e 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
@@ -20,8 +20,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.RuntimeAuthor;
-import org.apache.reef.tang.annotations.DefaultImplementation;
-import org.apache.reef.util.Optional;
/**
* Event from Driver Runtime -> Driver Process
@@ -29,32 +27,5 @@ import org.apache.reef.util.Optional;
*/
@RuntimeAuthor
@DriverSide
-@DefaultImplementation(ResourceAllocationEventImpl.class)
-public interface ResourceAllocationEvent {
-
- /**
- * @return Id of the allocated resource
- */
- String getIdentifier();
-
- /**
- * @return Memory size of the resource, in MB
- */
- int getResourceMemory();
-
- /**
- * @return Id of the node where resource was allocated
- */
- String getNodeId();
-
- /**
- * @return Number of virtual CPU cores on the resource
- */
- Optional<Integer> getVirtualCores();
-
- /**
- * @return Rack name of the resource
- */
- Optional<String> getRackName();
-
-}
+public interface ResourceAllocationEvent extends ResourceEvent {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java
deleted file mode 100644
index f8c5ade..0000000
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.common.driver.resourcemanager;
-
-import org.apache.reef.util.BuilderUtils;
-import org.apache.reef.util.Optional;
-
-/**
- * Default POJO implementation of ResourceAllocationEvent.
- * Use newBuilder to construct an instance.
- */
-public final class ResourceAllocationEventImpl implements
ResourceAllocationEvent {
- private final String identifier;
- private final int resourceMemory;
- private final String nodeId;
- private final Optional<Integer> virtualCores;
- private final Optional<String> rackName;
-
-
- private ResourceAllocationEventImpl(final Builder builder) {
- this.identifier = BuilderUtils.notNull(builder.identifier);
- this.resourceMemory = BuilderUtils.notNull(builder.resourceMemory);
- this.nodeId = BuilderUtils.notNull(builder.nodeId);
- this.virtualCores = Optional.ofNullable(builder.virtualCores);
- this.rackName = Optional.ofNullable(builder.rackName);
- }
-
- @Override
- public String getIdentifier() {
- return identifier;
- }
-
- @Override
- public int getResourceMemory() {
- return resourceMemory;
- }
-
- @Override
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public Optional<Integer> getVirtualCores() {
- return virtualCores;
- }
-
- @Override
- public Optional<String> getRackName() {
- return rackName;
- }
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * Builder used to create ResourceAllocationEvent instances.
- */
- public static final class Builder implements
org.apache.reef.util.Builder<ResourceAllocationEvent> {
- private String identifier;
- private Integer resourceMemory;
- private String nodeId;
- private Integer virtualCores;
- private String rackName;
-
-
- /**
- * @see ResourceAllocationEvent#getIdentifier()
- */
- public Builder setIdentifier(final String identifier) {
- this.identifier = identifier;
- return this;
- }
-
- /**
- * @see ResourceAllocationEvent#getResourceMemory()
- */
- public Builder setResourceMemory(final int resourceMemory) {
- this.resourceMemory = resourceMemory;
- return this;
- }
-
- /**
- * @see ResourceAllocationEvent#getNodeId()
- */
- public Builder setNodeId(final String nodeId) {
- this.nodeId = nodeId;
- return this;
- }
-
- /**
- * @see ResourceAllocationEvent#getVirtualCores()
- */
- public Builder setVirtualCores(final int virtualCores) {
- this.virtualCores = virtualCores;
- return this;
- }
-
- /**
- * @see ResourceAllocationEvent#getRackName()
- */
- public Builder setRackName(final String rackName) {
- this.rackName = rackName;
- return this;
- }
-
- @Override
- public ResourceAllocationEvent build() {
- return new ResourceAllocationEventImpl(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java
new file mode 100644
index 0000000..baae87e
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.Optional;
+
+/**
+ * An interface capturing the characteristics of a resource event.
+ */
+@DriverSide
+@Private
+public interface ResourceEvent {
+
+ /**
+ * @return Id of the resource
+ */
+ String getIdentifier();
+
+ /**
+ * @return Memory size of the resource, in MB
+ */
+ int getResourceMemory();
+
+ /**
+ * @return Id of the node where resource is
+ */
+ String getNodeId();
+
+ /**
+ * @return Number of virtual CPU cores on the resource
+ */
+ Optional<Integer> getVirtualCores();
+
+ /**
+ * @return Rack name of the resource
+ */
+ Optional<String> getRackName();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java
new file mode 100644
index 0000000..8d43be6
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.util.BuilderUtils;
+import org.apache.reef.util.Optional;
+
+/**
+ * Default POJO implementation of ResourceAllocationEvent and
ResourceRecoverEvent.
+ * Use newAllocationBuilder to construct an instance for
ResourceAllocationEvent and
+ * use newRecoveryBuilder to construct an instance for ResourceRecoverEvent.
+ */
+public final class ResourceEventImpl implements ResourceAllocationEvent,
ResourceRecoverEvent {
+ private final String identifier;
+ private final int resourceMemory;
+ private final String nodeId;
+ private final Optional<Integer> virtualCores;
+ private final Optional<String> rackName;
+
+
+ private ResourceEventImpl(final Builder builder) {
+ this.identifier = BuilderUtils.notNull(builder.identifier);
+ this.resourceMemory = builder.recovery ? builder.resourceMemory :
BuilderUtils.notNull(builder.resourceMemory);
+ this.nodeId = builder.recovery ? builder.nodeId :
BuilderUtils.notNull(builder.nodeId);
+ this.virtualCores = Optional.ofNullable(builder.virtualCores);
+ this.rackName = Optional.ofNullable(builder.rackName);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public int getResourceMemory() {
+ return resourceMemory;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public Optional<Integer> getVirtualCores() {
+ return virtualCores;
+ }
+
+ @Override
+ public Optional<String> getRackName() {
+ return rackName;
+ }
+
+ public static Builder newAllocationBuilder() {
+ return new Builder(false);
+ }
+
+ public static Builder newRecoveryBuilder() {
+ return new Builder(true);
+ }
+
+ /**
+ * Builder used to create ResourceAllocationEvent instances.
+ */
+ public static final class Builder implements
org.apache.reef.util.Builder<ResourceEventImpl> {
+ private final boolean recovery;
+
+ private String identifier;
+ private Integer resourceMemory;
+ private String nodeId;
+ private Integer virtualCores;
+ private String rackName;
+
+ private Builder(final boolean recovery){
+ this.recovery = recovery;
+ }
+
+ /**
+ * @see ResourceAllocationEvent#getIdentifier()
+ */
+ public Builder setIdentifier(final String identifier) {
+ this.identifier = identifier;
+ return this;
+ }
+
+ /**
+ * @see ResourceAllocationEvent#getResourceMemory()
+ */
+ public Builder setResourceMemory(final int resourceMemory) {
+ this.resourceMemory = resourceMemory;
+ return this;
+ }
+
+ /**
+ * @see ResourceAllocationEvent#getNodeId()
+ */
+ public Builder setNodeId(final String nodeId) {
+ this.nodeId = nodeId;
+ return this;
+ }
+
+ /**
+ * @see ResourceAllocationEvent#getVirtualCores()
+ */
+ public Builder setVirtualCores(final int virtualCores) {
+ this.virtualCores = virtualCores;
+ return this;
+ }
+
+ /**
+ * @see ResourceAllocationEvent#getRackName()
+ */
+ public Builder setRackName(final String rackName) {
+ this.rackName = rackName;
+ return this;
+ }
+
+ @Override
+ public ResourceEventImpl build() {
+ return new ResourceEventImpl(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java
new file mode 100644
index 0000000..0f90c9a
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+/**
+ * A Resource recovered by the {@link
org.apache.reef.driver.restart.DriverRestartManager}.
+ */
+@RuntimeAuthor
+@DriverSide
+public interface ResourceRecoverEvent extends ResourceEvent {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
index ba498e2..b1f0097 100644
---
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
+++
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
@@ -27,7 +27,7 @@ import
org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
-import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
import
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.common.files.FileResource;
@@ -217,7 +217,7 @@ public final class ResourceManager {
requestQueue.satisfyOne();
final Container container = cont.get();
// Tell the receivers about it
- final ResourceAllocationEvent alloc =
ResourceAllocationEventImpl.newBuilder()
+ final ResourceAllocationEvent alloc =
ResourceEventImpl.newAllocationBuilder()
.setIdentifier(container.getContainerID()).setNodeId(container.getNodeID())
.setResourceMemory(container.getMemory()).setVirtualCores(container.getNumberOfCores())
.setRackName(container.getRackName()).build();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
index a363e6a..533454b 100644
---
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
+++
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
@@ -28,7 +28,7 @@ import
org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
import
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
-import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
@@ -384,7 +384,7 @@ final class REEFScheduler implements Scheduler {
this.executors.add(taskStatus.getTaskId().getValue(),
resourceRequestProto.getMemorySize().get(),
evaluatorControlHandler);
- final ResourceAllocationEvent alloc =
ResourceAllocationEventImpl.newBuilder()
+ final ResourceAllocationEvent alloc =
ResourceEventImpl.newAllocationBuilder()
.setIdentifier(taskStatus.getTaskId().getValue())
.setNodeId(taskStatus.getSlaveId().getValue())
.setResourceMemory(resourceRequestProto.getMemorySize().get())
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index 27e5662..0e594ac 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -33,7 +33,7 @@ import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.DriverStatusManager;
import
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
-import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
@@ -408,7 +408,7 @@ final class YarnContainerManager
LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number
= {1}",
new Object[]{container.getResource().getMemory(),
container.getResource().getVirtualCores()});
-
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
+
this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder()
.setIdentifier(container.getId().toString())
.setNodeId(container.getNodeId().toString())
.setResourceMemory(container.getResource().getMemory())
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
index fbf64f8..495a777 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -29,8 +29,10 @@ import org.apache.reef.annotations.audience.Private;
import org.apache.reef.annotations.audience.RuntimeAuthor;
import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
import org.apache.reef.driver.restart.EvaluatorRestartInfo;
+import org.apache.reef.driver.restart.RestartEvaluators;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
import org.apache.reef.tang.annotations.Parameter;
@@ -56,6 +58,7 @@ public final class YarnDriverRuntimeRestartManager implements
DriverRuntimeResta
private final ApplicationMasterRegistration registration;
private final REEFEventHandlers reefEventHandlers;
private final YarnContainerManager yarnContainerManager;
+ private final RackNameFormatter rackNameFormatter;
private Set<Container> previousContainers;
@@ -64,11 +67,13 @@ public final class YarnDriverRuntimeRestartManager
implements DriverRuntimeResta
final EvaluatorPreserver
evaluatorPreserver,
final REEFEventHandlers
reefEventHandlers,
final ApplicationMasterRegistration
registration,
- final YarnContainerManager
yarnContainerManager) {
+ final YarnContainerManager
yarnContainerManager,
+ final RackNameFormatter
rackNameFormatter) {
this.registration = registration;
this.evaluatorPreserver = evaluatorPreserver;
this.reefEventHandlers = reefEventHandlers;
this.yarnContainerManager = yarnContainerManager;
+ this.rackNameFormatter = rackNameFormatter;
this.previousContainers = null;
}
@@ -161,15 +166,16 @@ public final class YarnDriverRuntimeRestartManager
implements DriverRuntimeResta
}
/**
- * Used by tDriverRestartManager. Gets the list of previous containers from
the resource manager,
+ * Used by {@link org.apache.reef.driver.restart.DriverRestartManager}.
+ * Gets the list of previous containers from the resource manager,
* compares that list to the YarnDriverRuntimeRestartManager's own list
based on the evalutor preserver,
* and determine which evaluators are alive and which have failed during
restart.
- * @return EvaluatorRestartInfo, the object encapsulating alive and failed
evaluator IDs.
+ * @return a map of Evaluator ID to {@link EvaluatorRestartInfo} for
evaluators that have either failed or survived
+ * driver restart.
*/
@Override
- public EvaluatorRestartInfo getAliveAndFailedEvaluators() {
- final Set<String> recoveredEvaluators = new HashSet<>();
- final Set<String> failedEvaluators = new HashSet<>();
+ public RestartEvaluators getPreviousEvaluators() {
+ final RestartEvaluators.Builder restartEvaluatorsBuilder =
RestartEvaluators.newBuilder();
this.initializeListOfPreviousContainers();
@@ -191,7 +197,8 @@ public final class YarnDriverRuntimeRestartManager
implements DriverRuntimeResta
if (!previousContainersIds.contains(expectedContainerId)) {
LOG.log(Level.WARNING, "Expected container [{0}] not alive, must
have failed during driver restart.",
expectedContainerId);
- failedEvaluators.add(expectedContainerId);
+ restartEvaluatorsBuilder.addRestartEvaluator(
+
EvaluatorRestartInfo.createFailedEvaluatorInfo(expectedContainerId));
}
}
}
@@ -208,11 +215,15 @@ public final class YarnDriverRuntimeRestartManager
implements DriverRuntimeResta
throw new RuntimeException("Not expecting container " +
container.getId().toString());
}
- recoveredEvaluators.add(container.getId().toString());
+
restartEvaluatorsBuilder.addRestartEvaluator(EvaluatorRestartInfo.createExpectedEvaluatorInfo(
+
ResourceEventImpl.newRecoveryBuilder().setIdentifier(container.getId().toString())
+
.setNodeId(container.getNodeId().toString()).setRackName(rackNameFormatter.getRackName(container))
+ .setResourceMemory(container.getResource().getMemory())
+
.setVirtualCores(container.getResource().getVirtualCores()).build()));
}
}
- return new EvaluatorRestartInfo(recoveredEvaluators, failedEvaluators);
+ return restartEvaluatorsBuilder.build();
}
/**
@@ -232,7 +243,6 @@ public final class YarnDriverRuntimeRestartManager
implements DriverRuntimeResta
.setState(ReefServiceProtos.State.FAILED)
.setExitCode(1)
.setDiagnostics("Container [" + evaluatorId + "] failed during
driver restart process.")
- .setIsFromPreviousDriver(true)
.build());
}
}