Repository: incubator-reef
Updated Branches:
refs/heads/master 6702ed586 -> 8598d4da6
[REEF-508] Determine restarts on YARN by using an environment variable
This addressed the issue by
* Using the environment variable to determine whether or not the job driver
has restarted.
* Falling back to previous containers reported by RM if using the environment
variable fails.
JIRA:
[REEF-508](https://issues.apache.org/jira/browse/REEF-508)
Pull Request:
This closes #335
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8598d4da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8598d4da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8598d4da
Branch: refs/heads/master
Commit: 8598d4da617d187b533230017bce2f7ded1b5c1c
Parents: 6702ed5
Author: Andrew Chung <[email protected]>
Authored: Tue Aug 4 10:35:39 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 5 11:37:12 2015 -0700
----------------------------------------------------------------------
.../yarn/driver/YarnDriverRestartManager.java | 86 ++++++++++++++++++--
1 file changed, 80 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8598d4da/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
index d6be373..43920e2 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
@@ -18,7 +18,11 @@
*/
package org.apache.reef.runtime.yarn.driver;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
@@ -52,7 +56,7 @@ public final class YarnDriverRestartManager implements
DriverRestartManager {
private final ApplicationMasterRegistration registration;
private final DriverStatusManager driverStatusManager;
private final REEFEventHandlers reefEventHandlers;
- private List<Container> previousContainers;
+ private Set<Container> previousContainers;
@Inject
private YarnDriverRestartManager(@Parameter(YarnEvaluatorPreserver.class)
@@ -67,19 +71,77 @@ public final class YarnDriverRestartManager implements
DriverRestartManager {
this.previousContainers = null;
}
+ /**
+ * Determines whether the application master has been restarted based on the
container ID environment
+ * variable provided by YARN. If that fails, determine whether the
application master is a restart
+ * based on the number of previous containers reported by YARN.
+ * @return true if the application master is a restarted instance, false
otherwise.
+ */
@Override
public boolean isRestart() {
- // TODO [REEF-508]: Determine restart based on environment variable as
provided by YARN.
+ final String containerIdString = getContainerIdString();
+
+ if (containerIdString == null) {
+ // container id should always be set in the env by the framework
+ LOG.log(Level.WARNING, "Container ID is null, determining restart based
on previous containers.");
+ return this.isRestartByPreviousContainers();
+ }
+
+ final ApplicationAttemptId appAttemptID =
getAppAttemptId(containerIdString);
+
+ if (appAttemptID == null) {
+ LOG.log(Level.WARNING, "applicationAttempt ID is null, determining
restart based on previous containers.");
+ return this.isRestartByPreviousContainers();
+ }
+
+ LOG.log(Level.FINE, "Application attempt: " + appAttemptID.getAttemptId());
+
+ return appAttemptID.getAttemptId() > 1;
+ }
+
+ private static String getContainerIdString() {
+ try {
+ return
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key());
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Unable to get the container ID from the
environment, exception " +
+ e + " was thrown.");
+ return null;
+ }
+ }
+
+ private static ApplicationAttemptId getAppAttemptId(final String
containerIdString) {
+ try {
+ final ContainerId containerId =
ConverterUtils.toContainerId(containerIdString);
+ return containerId.getApplicationAttemptId();
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Unable to get the applicationAttempt ID from the
environment, exception " +
+ e + " was thrown.");
+ return null;
+ }
+ }
+
+ /**
+ * Initializes the list of previous containers and determine whether or not
this is an instance of restart
+ * based on information reported by the RM.
+ * @return true if previous containers is not empty.
+ */
+ private boolean isRestartByPreviousContainers() {
+ this.initializeListOfPreviousContainers();
+ return !this.previousContainers.isEmpty();
+ }
+
+ /**
+ * Initializes the list of previous containers as reported by YARN.
+ */
+ private synchronized void initializeListOfPreviousContainers() {
if (this.previousContainers == null) {
- this.previousContainers =
this.registration.getRegistration().getContainersFromPreviousAttempts();
+ this.previousContainers = new
HashSet<>(this.registration.getRegistration().getContainersFromPreviousAttempts());
// If it's still null, create an empty list to indicate that it's not a
restart.
if (this.previousContainers == null) {
- this.previousContainers = new ArrayList<>();
+ this.previousContainers = new HashSet<>();
}
}
-
- return !this.previousContainers.isEmpty();
}
@Override
@@ -87,6 +149,8 @@ public final class YarnDriverRestartManager implements
DriverRestartManager {
final Set<String> recoveredEvaluators = new HashSet<>();
final Set<String> failedEvaluators = new HashSet<>();
+ this.initializeListOfPreviousContainers();
+
if (this.previousContainers != null && !this.previousContainers.isEmpty())
{
LOG.log(Level.INFO, "Driver restarted, with {0} previous containers",
this.previousContainers.size());
final Set<String> expectedContainers =
this.evaluatorPreserver.recoverEvaluators();
@@ -131,11 +195,21 @@ public final class YarnDriverRestartManager implements
DriverRestartManager {
this.informAboutEvaluatorFailures(failedEvaluators);
}
+ /**
+ * Informs the driver status manager about the number of evaluators to wait
for to reinitiate contact
+ * with the driver.
+ * TODO [REEF-559]: Tighten previous evaluator ID checks by using entire set
of evaluator IDs.
+ * @param evaluatorIds The set of evaluator IDs of evaluators expected to be
alive.
+ */
private void informAboutEvaluatorAlive(final Set<String> evaluatorIds) {
// We will wait for these evaluators to contact us, so we do not need to
record the entire container information.
this.driverStatusManager.setNumPreviousContainers(evaluatorIds.size());
}
+ /**
+ * Generate failure events for evaluators that cannot be recovered.
+ * @param evaluatorIds The set of evaluator IDs of evaluators that have
failed on restart.
+ */
private void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
for (String evaluatorId : evaluatorIds) {
LOG.log(Level.WARNING, "Container [" + evaluatorId +