Repository: incubator-reef
Updated Branches:
  refs/heads/master 6702ed586 -> 8598d4da6


[REEF-508] Determine restarts on YARN by using an environment variable

This addressed the issue by
  * Using the environment variable to determine whether or not the job driver
    has restarted.
  * Falling back to previous containers reported by RM if using the environment
    variable fails.

JIRA:
  [REEF-508](https://issues.apache.org/jira/browse/REEF-508)

Pull Request:
  This closes #335


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8598d4da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8598d4da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8598d4da

Branch: refs/heads/master
Commit: 8598d4da617d187b533230017bce2f7ded1b5c1c
Parents: 6702ed5
Author: Andrew Chung <[email protected]>
Authored: Tue Aug 4 10:35:39 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 5 11:37:12 2015 -0700

----------------------------------------------------------------------
 .../yarn/driver/YarnDriverRestartManager.java   | 86 ++++++++++++++++++--
 1 file changed, 80 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8598d4da/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
index d6be373..43920e2 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
@@ -18,7 +18,11 @@
  */
 package org.apache.reef.runtime.yarn.driver;
 
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
@@ -52,7 +56,7 @@ public final class YarnDriverRestartManager implements 
DriverRestartManager {
   private final ApplicationMasterRegistration registration;
   private final DriverStatusManager driverStatusManager;
   private final REEFEventHandlers reefEventHandlers;
-  private List<Container> previousContainers;
+  private Set<Container> previousContainers;
 
   @Inject
   private YarnDriverRestartManager(@Parameter(YarnEvaluatorPreserver.class)
@@ -67,19 +71,77 @@ public final class YarnDriverRestartManager implements 
DriverRestartManager {
     this.previousContainers = null;
   }
 
+  /**
+   * Determines whether the application master has been restarted based on the 
container ID environment
+   * variable provided by YARN. If that fails, determine whether the 
application master is a restart
+   * based on the number of previous containers reported by YARN.
+   * @return true if the application master is a restarted instance, false 
otherwise.
+   */
   @Override
   public boolean isRestart() {
-    // TODO [REEF-508]: Determine restart based on environment variable as 
provided by YARN.
+    final String containerIdString = getContainerIdString();
+
+    if (containerIdString == null) {
+      // container id should always be set in the env by the framework
+      LOG.log(Level.WARNING, "Container ID is null, determining restart based 
on previous containers.");
+      return this.isRestartByPreviousContainers();
+    }
+
+    final ApplicationAttemptId appAttemptID = 
getAppAttemptId(containerIdString);
+
+    if (appAttemptID == null) {
+      LOG.log(Level.WARNING, "applicationAttempt ID is null, determining 
restart based on previous containers.");
+      return this.isRestartByPreviousContainers();
+    }
+
+    LOG.log(Level.FINE, "Application attempt: " + appAttemptID.getAttemptId());
+
+    return appAttemptID.getAttemptId() > 1;
+  }
+
+  private static String getContainerIdString() {
+    try {
+      return 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key());
+    } catch (Exception e) {
+      LOG.log(Level.WARNING, "Unable to get the container ID from the 
environment, exception " +
+          e + " was thrown.");
+      return null;
+    }
+  }
+
+  private static ApplicationAttemptId getAppAttemptId(final String 
containerIdString) {
+    try {
+      final ContainerId containerId = 
ConverterUtils.toContainerId(containerIdString);
+      return containerId.getApplicationAttemptId();
+    } catch (Exception e) {
+      LOG.log(Level.WARNING, "Unable to get the applicationAttempt ID from the 
environment, exception " +
+          e + " was thrown.");
+      return null;
+    }
+  }
+
+  /**
+   * Initializes the list of previous containers and determine whether or not 
this is an instance of restart
+   * based on information reported by the RM.
+   * @return true if previous containers is not empty.
+   */
+  private boolean isRestartByPreviousContainers() {
+    this.initializeListOfPreviousContainers();
+    return !this.previousContainers.isEmpty();
+  }
+
+  /**
+   * Initializes the list of previous containers as reported by YARN.
+   */
+  private synchronized void initializeListOfPreviousContainers() {
     if (this.previousContainers == null) {
-      this.previousContainers = 
this.registration.getRegistration().getContainersFromPreviousAttempts();
+      this.previousContainers = new 
HashSet<>(this.registration.getRegistration().getContainersFromPreviousAttempts());
 
       // If it's still null, create an empty list to indicate that it's not a 
restart.
       if (this.previousContainers == null) {
-        this.previousContainers = new ArrayList<>();
+        this.previousContainers = new HashSet<>();
       }
     }
-
-    return !this.previousContainers.isEmpty();
   }
 
   @Override
@@ -87,6 +149,8 @@ public final class YarnDriverRestartManager implements 
DriverRestartManager {
     final Set<String> recoveredEvaluators = new HashSet<>();
     final Set<String> failedEvaluators = new HashSet<>();
 
+    this.initializeListOfPreviousContainers();
+
     if (this.previousContainers != null && !this.previousContainers.isEmpty()) 
{
       LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", 
this.previousContainers.size());
       final Set<String> expectedContainers = 
this.evaluatorPreserver.recoverEvaluators();
@@ -131,11 +195,21 @@ public final class YarnDriverRestartManager implements 
DriverRestartManager {
     this.informAboutEvaluatorFailures(failedEvaluators);
   }
 
+  /**
+   * Informs the driver status manager about the number of evaluators to wait 
for to reinitiate contact
+   * with the driver.
+   * TODO [REEF-559]: Tighten previous evaluator ID checks by using entire set 
of evaluator IDs.
+   * @param evaluatorIds The set of evaluator IDs of evaluators expected to be 
alive.
+   */
   private void informAboutEvaluatorAlive(final Set<String> evaluatorIds) {
     // We will wait for these evaluators to contact us, so we do not need to 
record the entire container information.
     this.driverStatusManager.setNumPreviousContainers(evaluatorIds.size());
   }
 
+  /**
+   * Generate failure events for evaluators that cannot be recovered.
+   * @param evaluatorIds The set of evaluator IDs of evaluators that have 
failed on restart.
+   */
   private void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
     for (String evaluatorId : evaluatorIds) {
       LOG.log(Level.WARNING, "Container [" + evaluatorId +

Reply via email to