YARN-5049. Extend NMStateStore to save queued container information. 
(Konstantinos Karanasos via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d464f4d1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d464f4d1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d464f4d1

Branch: refs/heads/HDFS-1312
Commit: d464f4d1c4dec483852fc8c0496787cba0af8f57
Parents: 7251bb9
Author: Arun Suresh <asur...@apache.org>
Authored: Wed May 11 19:10:17 2016 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Wed May 11 19:10:17 2016 -0700

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  | 26 ++++++++-----
 .../queuing/QueuingContainerManagerImpl.java    | 41 ++++++++++++++++++++
 .../recovery/NMLeveldbStateStoreService.java    | 26 ++++++++++++-
 .../recovery/NMNullStateStoreService.java       |  4 ++
 .../recovery/NMStateStoreService.java           |  9 +++++
 .../recovery/NMMemoryStateStoreService.java     |  6 +++
 .../TestNMLeveldbStateStoreService.java         | 12 ++++++
 7 files changed, 113 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d464f4d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 516ef90..4383d2b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -335,7 +335,6 @@ public class ContainerManagerImpl extends CompositeService 
implements
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
 
-  @SuppressWarnings("unchecked")
   private void recoverContainer(RecoveredContainerState rcs)
       throws IOException {
     StartContainerRequest req = rcs.getStartRequest();
@@ -350,14 +349,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
         + " with exit code " + rcs.getExitCode());
 
     if (context.getApplications().containsKey(appId)) {
-      Credentials credentials =
-          YarnServerSecurityUtils.parseCredentials(launchContext);
-      Container container = new ContainerImpl(getConfig(), dispatcher,
-          req.getContainerLaunchContext(),
-          credentials, metrics, token, context, rcs);
-      context.getContainers().put(containerId, container);
-      dispatcher.getEventHandler().handle(
-          new ApplicationContainerInitEvent(container));
+      recoverActiveContainer(launchContext, token, rcs);
     } else {
       if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
         LOG.warn(containerId + " has no corresponding application!");
@@ -367,6 +359,22 @@ public class ContainerManagerImpl extends CompositeService 
implements
     }
   }
 
+  /**
+   * Recover a running container.
+   */
+  @SuppressWarnings("unchecked")
+  protected void recoverActiveContainer(
+      ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
+      RecoveredContainerState rcs) throws IOException {
+    Credentials credentials = YarnServerSecurityUtils.parseCredentials(
+        launchContext);
+    Container container = new ContainerImpl(getConfig(), dispatcher,
+        launchContext, credentials, metrics, token, context, rcs);
+    context.getContainers().put(token.getContainerID(), container);
+    dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(
+        container));
+  }
+
   private void waitForRecoveredContainers() throws InterruptedException {
     final int sleepMsec = 100;
     int waitIterations = 100;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d464f4d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
index 663dd3b..94d3172 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -56,6 +57,8 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,6 +124,11 @@ public class QueuingContainerManagerImpl extends 
ContainerManagerImpl {
             hasResourcesAvailable(allocatedContInfo.getPti())) {
       startAllocatedContainer(allocatedContInfo);
     } else {
+      this.context.getNMStateStore().storeContainer(containerTokenIdentifier
+          .getContainerID(), request);
+      this.context.getNMStateStore().storeContainerQueued(
+          containerTokenIdentifier.getContainerID());
+
       if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
         queuedGuaranteedContainers.add(allocatedContInfo);
         // Kill running opportunistic containers to make space for
@@ -150,6 +158,7 @@ public class QueuingContainerManagerImpl extends 
ContainerManagerImpl {
         this.context.getQueuingContext().getKilledQueuedContainers().put(
             containerTokenId,
             "Queued container request removed by ApplicationMaster.");
+        this.context.getNMStateStore().storeContainerKilled(containerID);
       } else {
         // The container started execution in the meanwhile.
         try {
@@ -446,6 +455,38 @@ public class QueuingContainerManagerImpl extends 
ContainerManagerImpl {
     return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
   }
 
+  /**
+   * Recover running or queued container.
+   */
+  @Override
+  protected void recoverActiveContainer(
+      ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
+      RecoveredContainerState rcs) throws IOException {
+    if (rcs.getStatus() ==
+        RecoveredContainerStatus.QUEUED && !rcs.getKilled()) {
+      LOG.info(token.getContainerID()
+          + "will be added to the queued containers.");
+
+      AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
+          token, rcs.getStartRequest(), token.getExecutionType(),
+              token.getResource(), getConfig());
+
+      this.context.getQueuingContext().getQueuedContainers().put(
+          token.getContainerID(), token);
+
+      if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
+        queuedGuaranteedContainers.add(allocatedContInfo);
+        // Kill running opportunistic containers to make space for
+        // guaranteed container.
+        killOpportunisticContainers(allocatedContInfo);
+      } else {
+        queuedOpportunisticContainers.add(allocatedContInfo);
+      }
+    } else {
+      super.recoverActiveContainer(launchContext, token, rcs);
+    }
+  }
+
   @VisibleForTesting
   public int getNumAllocatedGuaranteedContainers() {
     return allocatedGuaranteedContainers.size();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d464f4d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 6e9efe1..8bd2040 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -80,7 +80,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
   
   private static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(1, 0);
+      .newInstance(2, 0);
 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
@@ -106,6 +106,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
   private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
   private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+  private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
   private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
       "/resourceChanged";
   private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
@@ -239,8 +240,13 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
             StartContainerRequestProto.parseFrom(entry.getValue()));
       } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
         rcs.diagnostics = asString(entry.getValue());
-      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+      } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) {
         if (rcs.status == RecoveredContainerStatus.REQUESTED) {
+          rcs.status = RecoveredContainerStatus.QUEUED;
+        }
+      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+        if ((rcs.status == RecoveredContainerStatus.REQUESTED)
+            || (rcs.status == RecoveredContainerStatus.QUEUED)) {
           rcs.status = RecoveredContainerStatus.LAUNCHED;
         }
       } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
@@ -284,6 +290,21 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException 
{
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("storeContainerQueued: containerId=" + containerId);
+    }
+
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_QUEUED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -417,6 +438,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
         db.write(batch);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d464f4d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 08b80e9..112095e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -75,6 +75,10 @@ public class NMNullStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException 
{
+  }
+
+  @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d464f4d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index ccf1e70..57f35a4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -62,6 +62,7 @@ public abstract class NMStateStoreService extends 
AbstractService {
 
   public enum RecoveredContainerStatus {
     REQUESTED,
+    QUEUED,
     LAUNCHED,
     COMPLETED
   }
@@ -312,6 +313,14 @@ public abstract class NMStateStoreService extends 
AbstractService {
       StartContainerRequest startRequest) throws IOException;
 
   /**
+   * Record that a container has been queued at the NM
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerQueued(ContainerId containerId)
+      throws IOException;
+
+  /**
    * Record that a container has been launched
    * @param containerId the container ID
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d464f4d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 4652245..3c5edc0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -132,6 +132,12 @@ public class NMMemoryStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException 
{
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.QUEUED;
+  }
+
+  @Override
   public synchronized void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d464f4d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index ccc9254..2f409c8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -280,6 +280,18 @@ public class TestNMLeveldbStateStoreService {
     // check whether the new container record is discarded
     assertEquals(1, recoveredContainers.size());
 
+    // queue the container, and verify recovered
+    stateStore.storeContainerQueued(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+
     // launch the container, add some diagnostics, and verify recovered
     StringBuilder diags = new StringBuilder();
     stateStore.storeContainerLaunched(containerId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to