YARN-7275. NM Statestore cleanup for Container updates. (Kartheek Muthyala via 
asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a50be1b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a50be1b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a50be1b8

Branch: refs/heads/YARN-1011
Commit: a50be1b8f432f50c940b66d12c7de87b95ea47c0
Parents: 8dbc890
Author: Arun Suresh <asur...@apache.org>
Authored: Mon Oct 16 13:08:52 2017 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Mon Oct 16 13:12:15 2017 -0700

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  |  9 +++
 .../container/ContainerEventType.java           |  4 +-
 .../container/ContainerImpl.java                | 43 +++++++++-----
 .../launcher/ContainersLauncher.java            |  2 -
 .../launcher/RecoverPausedContainerLaunch.java  | 38 +++++++-----
 .../launcher/RecoveredContainerLaunch.java      |  2 +-
 .../scheduler/ContainerScheduler.java           | 31 ++++++++++
 .../scheduler/ContainerSchedulerEventType.java  |  3 +-
 .../recovery/NMLeveldbStateStoreService.java    | 62 ++++++++++++++------
 .../recovery/NMNullStateStoreService.java       |  6 +-
 .../recovery/NMStateStoreService.java           | 10 ++--
 .../TestContainerManagerRecovery.java           |  4 ++
 .../recovery/NMMemoryStateStoreService.java     | 18 ++++--
 .../TestNMLeveldbStateStoreService.java         | 16 +++--
 14 files changed, 179 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 38eb636..7d5525a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -367,6 +368,13 @@ public class ContainerManagerImpl extends CompositeService 
implements
         }
         recoverContainer(rcs);
       }
+
+      //Dispatching the RECOVERY_COMPLETED event through the dispatcher
+      //so that all the paused, scheduled and queued containers will
+      //be scheduled for execution on availability of resources.
+      dispatcher.getEventHandler().handle(
+          new ContainerSchedulerEvent(null,
+              ContainerSchedulerEventType.RECOVERY_COMPLETED));
     } else {
       LOG.info("Not a recoverable state store. Nothing to recover.");
     }
@@ -480,6 +488,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
     Container container = new ContainerImpl(getConfig(), dispatcher,
         launchContext, credentials, metrics, token, context, rcs);
     context.getContainers().put(token.getContainerID(), container);
+    containerScheduler.recoverActiveContainer(container, rcs.getStatus());
     app.handle(new ApplicationContainerInitEvent(container));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index e28b37d..75e32e4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -46,5 +46,7 @@ public enum ContainerEventType {
   CONTAINER_RESUMED,
 
   // Producer: ContainerScheduler
-  CONTAINER_TOKEN_UPDATED
+  CONTAINER_TOKEN_UPDATED,
+
+  RECOVER_PAUSED_CONTAINER
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index df83789..17b24b4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -362,6 +362,9 @@ public class ContainerImpl implements Container {
     // From SCHEDULED State
     .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
         ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
+    .addTransition(ContainerState.SCHEDULED, ContainerState.PAUSED,
+        ContainerEventType.RECOVER_PAUSED_CONTAINER,
+        new RecoveredContainerTransition())
     .addTransition(ContainerState.SCHEDULED, 
ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new ExitedWithFailureTransition(true))
@@ -952,7 +955,10 @@ public class ContainerImpl implements Container {
       if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
         // try to recover a container that was previously launched
         launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+      } else if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
+        launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
       }
+
       containerLaunchStartTime = clock.getTime();
       dispatcher.getEventHandler().handle(
           new ContainersLauncherEvent(this, launcherEvent));
@@ -963,9 +969,6 @@ public class ContainerImpl implements Container {
   @SuppressWarnings("unchecked") // dispatcher not typed
   private void sendScheduleEvent() {
     if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
-      // Recovery is not supported for paused container so we raise the
-      // launch event which will proceed to kill the paused container instead
-      // of raising the schedule event.
       ContainersLauncherEventType launcherEvent;
       launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
       dispatcher.getEventHandler()
@@ -1060,17 +1063,15 @@ public class ContainerImpl implements Container {
       UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
       // Update the container token
       container.setContainerTokenIdentifier(updateEvent.getUpdatedToken());
-      if (updateEvent.isResourceChange()) {
-        try {
-          // Persist change in the state store.
-          container.context.getNMStateStore().storeContainerResourceChanged(
-              container.containerId,
-              container.getContainerTokenIdentifier().getVersion(),
-              container.getResource());
-        } catch (IOException e) {
-          LOG.warn("Could not store container [" + container.containerId
-              + "] resource change..", e);
-        }
+
+      try {
+        // Persist change in the state store.
+        container.context.getNMStateStore()
+            .storeContainerUpdateToken(container.containerId,
+                container.getContainerTokenIdentifier());
+      } catch (IOException e) {
+        LOG.warn("Could not store container [" + container.containerId
+            + "] update..", e);
       }
     }
   }
@@ -1115,6 +1116,8 @@ public class ContainerImpl implements Container {
       if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
         container.sendFinishedEvents();
         return ContainerState.DONE;
+      } else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) 
{
+        return ContainerState.SCHEDULED;
       } else if (container.recoveredAsKilled &&
           container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
         // container was killed but never launched
@@ -1474,6 +1477,18 @@ public class ContainerImpl implements Container {
   }
 
   /**
+   * Transition from SCHEDULED state to PAUSED state on recovery
+   */
+  static class RecoveredContainerTransition extends ContainerTransition {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      container.sendContainerMonitorStartEvent();
+      container.wasLaunched = true;
+    }
+  }
+
+  /**
    * Transition from RUNNING or KILLING state to
    * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 9f6ef74..cfd5d6a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -140,8 +140,6 @@ public class ContainersLauncher extends AbstractService
         running.put(containerId, launch);
         break;
       case RECOVER_PAUSED_CONTAINER:
-        // Recovery for paused containers is not supported, thus here
-        // we locate any paused containers, and terminate them.
         app = context.getApplications().get(
             containerId.getApplicationAttemptId().getApplicationId());
         launch = new RecoverPausedContainerLaunch(context, getConfig(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
index 14cab9a..761fe3b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
@@ -30,7 +30,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*;
-import 
org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import 
org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
 
 import java.io.File;
 import java.io.IOException;
@@ -66,6 +66,8 @@ public class RecoverPausedContainerLaunch extends 
ContainerLaunch {
         containerId.getApplicationAttemptId().getApplicationId().toString();
     String containerIdStr = containerId.toString();
 
+    dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
+        ContainerEventType.RECOVER_PAUSED_CONTAINER));
     boolean notInterrupted = true;
     try {
       File pidFile = locatePidFile(appIdStr, containerIdStr);
@@ -73,16 +75,17 @@ public class RecoverPausedContainerLaunch extends 
ContainerLaunch {
         String pidPathStr = pidFile.getPath();
         pidFilePath = new Path(pidPathStr);
         exec.activateContainer(containerId, pidFilePath);
-        exec.signalContainer(new ContainerSignalContext.Builder()
-            .setContainer(container)
-            .setUser(container.getUser())
-            .setSignal(ContainerExecutor.Signal.KILL)
-            .build());
+        retCode = exec.reacquireContainer(
+            new ContainerReacquisitionContext.Builder()
+                .setContainer(container)
+                .setUser(container.getUser())
+                .setContainerId(containerId)
+                .build());
       } else {
         LOG.warn("Unable to locate pid file for container " + containerIdStr);
       }
 
-    } catch (InterruptedIOException e) {
+    } catch (InterruptedException | InterruptedIOException e) {
       LOG.warn("Interrupted while waiting for exit code from " + containerId);
       notInterrupted = false;
     } catch (IOException e) {
@@ -100,14 +103,21 @@ public class RecoverPausedContainerLaunch extends 
ContainerLaunch {
       }
     }
 
-    LOG.warn("Recovered container exited with a non-zero exit code "
-        + retCode);
-    this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
-        containerId,
-        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
-        "Container exited with a non-zero exit code " + retCode));
+    if (retCode != 0) {
+      LOG.warn("Recovered container exited with a non-zero exit code "
+          + retCode);
+      this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
+          containerId,
+          ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
+          "Container exited with a non-zero exit code " + retCode));
+      return retCode;
+    }
 
-    return retCode;
+    LOG.info("Recovered container " + containerId + " succeeded");
+    dispatcher.getEventHandler().handle(
+        new ContainerEvent(containerId,
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
+    return 0;
   }
 
   private File locatePidFile(String appIdStr, String containerIdStr) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
index 17ddd77..a3ccf00 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
@@ -72,7 +72,7 @@ public class RecoveredContainerLaunch extends ContainerLaunch 
{
     String containerIdStr = containerId.toString();
 
     dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
-        ContainerEventType.CONTAINER_LAUNCHED));
+        ContainerEventType.RECOVER_PAUSED_CONTAINER));
 
     boolean notInterrupted = true;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index e436822..76da37c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -41,6 +41,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 
 
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -168,6 +169,8 @@ public class ContainerScheduler extends AbstractService 
implements
     case SHED_QUEUED_CONTAINERS:
       shedQueuedOpportunisticContainers();
       break;
+    case RECOVERY_COMPLETED:
+      startPendingContainers(maxOppQueueLength <= 0);
     default:
       LOG.error("Unknown event arrived at ContainerScheduler: "
           + event.toString());
@@ -219,6 +222,34 @@ public class ContainerScheduler extends AbstractService 
implements
   }
 
   /**
+   * Populates auxiliary data structures used by the ContainerScheduler on
+   * recovery.
+   * @param container container recovered
+   * @param rcs Recovered Container status
+   */
+  public void recoverActiveContainer(Container container,
+      RecoveredContainerStatus rcs) {
+    ExecutionType execType =
+        container.getContainerTokenIdentifier().getExecutionType();
+    if (rcs == RecoveredContainerStatus.QUEUED
+        || rcs == RecoveredContainerStatus.PAUSED) {
+      if (execType == ExecutionType.GUARANTEED) {
+        queuedGuaranteedContainers.put(container.getContainerId(), container);
+      } else if (execType == ExecutionType.OPPORTUNISTIC) {
+        queuedOpportunisticContainers
+            .put(container.getContainerId(), container);
+      } else {
+        LOG.error(
+            "UnKnown execution type received " + container.getContainerId()
+                + ", execType " + execType);
+      }
+    } else if (rcs == RecoveredContainerStatus.LAUNCHED) {
+      runningContainers.put(container.getContainerId(), container);
+      utilizationTracker.addContainerResources(container);
+    }
+  }
+
+  /**
    * Return number of queued containers.
    * @return Number of queued containers.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index a9cbf74..294eddf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -27,5 +27,6 @@ public enum ContainerSchedulerEventType {
   UPDATE_CONTAINER,
   // Producer: Node HB response - RM has asked to shed the queue
   SHED_QUEUED_CONTAINERS,
-  CONTAINER_PAUSED
+  CONTAINER_PAUSED,
+  RECOVERY_COMPLETED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 2f9c0a7..c361d00 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import static org.fusesource.leveldbjni.JniDBFactory.asString;
 import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
@@ -47,11 +50,8 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestP
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -60,6 +60,7 @@ import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Deletion
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import 
org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
+import 
org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@@ -120,8 +121,8 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
   private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
   private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused";
-  private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
-      "/resourceChanged";
+  private static final String CONTAINER_UPDATE_TOKEN_SUFFIX =
+      "/updateToken";
   private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
   private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
   private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
@@ -290,9 +291,17 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
       } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
         rcs.status = RecoveredContainerStatus.COMPLETED;
         rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
-      } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) {
-        rcs.capability = new ResourcePBImpl(
-            ResourceProto.parseFrom(entry.getValue()));
+      } else if (suffix.equals(CONTAINER_UPDATE_TOKEN_SUFFIX)) {
+        ContainerTokenIdentifierProto tokenIdentifierProto =
+            ContainerTokenIdentifierProto.parseFrom(entry.getValue());
+        Token currentToken = rcs.getStartRequest().getContainerToken();
+        Token updatedToken = Token
+            .newInstance(tokenIdentifierProto.toByteArray(),
+                ContainerTokenIdentifier.KIND.toString(),
+                currentToken.getPassword().array(), currentToken.getService());
+        rcs.startRequest.setContainerToken(updatedToken);
+        rcs.capability = new 
ResourcePBImpl(tokenIdentifierProto.getResource());
+        rcs.version = tokenIdentifierProto.getVersion();
       } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
         rcs.setRemainingRetryAttempts(
             Integer.parseInt(asString(entry.getValue())));
@@ -374,6 +383,21 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
     }
   }
 
+  private void removeContainerQueued(ContainerId containerId)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("removeContainerQueued: containerId=" + containerId);
+    }
+
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_QUEUED_KEY_SUFFIX;
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
   @Override
   public void storeContainerPaused(ContainerId containerId) throws IOException 
{
     if (LOG.isDebugEnabled()) {
@@ -429,6 +453,8 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
       LOG.debug("storeContainerLaunched: containerId=" + containerId);
     }
 
+    // Removing the container if queued for backward compatibility reasons
+    removeContainerQueued(containerId);
     String key = CONTAINERS_KEY_PREFIX + containerId.toString()
         + CONTAINER_LAUNCHED_KEY_SUFFIX;
     try {
@@ -439,24 +465,25 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
-  public void storeContainerResourceChanged(ContainerId containerId,
-      int containerVersion, Resource capability) throws IOException {
+  public void storeContainerUpdateToken(ContainerId containerId,
+      ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("storeContainerResourceChanged: containerId=" + containerId
-          + ", capability=" + capability);
+      LOG.debug("storeContainerUpdateToken: containerId=" + containerId);
     }
 
-    String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString()
-        + CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX;
+    String keyUpdateToken = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_UPDATE_TOKEN_SUFFIX;
     String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString()
         + CONTAINER_VERSION_KEY_SUFFIX;
+
     try {
       WriteBatch batch = db.createWriteBatch();
       try {
         // New value will overwrite old values for the same key
-        batch.put(bytes(keyResChng),
-            ProtoUtils.convertToProtoFormat(capability).toByteArray());
-        batch.put(bytes(keyVersion), 
bytes(Integer.toString(containerVersion)));
+        batch.put(bytes(keyUpdateToken),
+            containerTokenIdentifier.getProto().toByteArray());
+        batch.put(bytes(keyVersion),
+            bytes(Integer.toString(containerTokenIdentifier.getVersion())));
         db.write(batch);
       } finally {
         batch.close();
@@ -552,6 +579,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_UPDATE_TOKEN_SUFFIX));
         List<String> unknownKeysForContainer = containerUnknownKeySuffixes
             .removeAll(containerId);
         for (String unknownKeySuffix : unknownKeysForContainer) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index d1d0696..ca6d018 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -28,12 +28,12 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 // The state store to use when state isn't being stored
@@ -99,8 +99,8 @@ public class NMNullStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
-  public void storeContainerResourceChanged(ContainerId containerId,
-      int version, Resource capability) throws IOException {
+  public void storeContainerUpdateToken(ContainerId containerId,
+      ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 999d2d9..5e2b8a5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Containe
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 
@@ -429,14 +430,13 @@ public abstract class NMStateStoreService extends 
AbstractService {
       throws IOException;
 
   /**
-   * Record that a container resource has been changed
+   * Record that a container has been updated
    * @param containerId the container ID
-   * @param containerVersion the container version
-   * @param capability the container resource capability
+   * @param containerTokenIdentifier container token identifier
    * @throws IOException
    */
-  public abstract void storeContainerResourceChanged(ContainerId containerId,
-      int containerVersion, Resource capability) throws IOException;
+  public abstract void storeContainerUpdateToken(ContainerId containerId,
+      ContainerTokenIdentifier containerTokenIdentifier) throws IOException;
 
   /**
    * Record that a container has completed

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 5ec0ae6..0e629d4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -405,6 +405,7 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     stateStore.start();
     context = createContext(conf, stateStore);
     ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+    ((NMContext) context).setContainerManager(cm);
     cm.init(conf);
     cm.start();
     // add an application by starting a container
@@ -430,6 +431,7 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     cm.stop();
     context = createContext(conf, stateStore);
     cm = createContainerManager(context);
+    ((NMContext) context).setContainerManager(cm);
     cm.init(conf);
     cm.start();
     assertEquals(1, context.getApplications().size());
@@ -448,6 +450,7 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     stateStore.start();
     context = createContext(conf, stateStore);
     ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+    ((NMContext) context).setContainerManager(cm);
     cm.init(conf);
     cm.start();
 
@@ -473,6 +476,7 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     cm.stop();
     context = createContext(conf, stateStore);
     cm = createContainerManager(context);
+    ((NMContext) context).setContainerManager(cm);
     cm.init(conf);
     cm.start();
     assertEquals(1, context.getApplications().size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 59a225a..eb222cd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -33,12 +33,13 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@@ -175,12 +176,17 @@ public class NMMemoryStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
-  public synchronized void storeContainerResourceChanged(
-      ContainerId containerId, int version, Resource capability)
-      throws IOException {
+  public void storeContainerUpdateToken(ContainerId containerId,
+      ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
-    rcs.capability = capability;
-    rcs.version = version;
+    rcs.capability = containerTokenIdentifier.getResource();
+    rcs.version = containerTokenIdentifier.getVersion();
+    Token currentToken = rcs.getStartRequest().getContainerToken();
+    Token updatedToken = Token
+        .newInstance(containerTokenIdentifier.getBytes(),
+            ContainerTokenIdentifier.KIND.toString(),
+            currentToken.getPassword().array(), currentToken.getService());
+    rcs.startRequest.setContainerToken(updatedToken);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 8c13356..1ff2119 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -307,13 +307,18 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(1, recoveredContainers.size());
 
     // increase the container size, and verify recovered
-    stateStore.storeContainerResourceChanged(containerId, 2,
-        Resource.newInstance(2468, 4));
+    ContainerTokenIdentifier updateTokenIdentifier =
+        new ContainerTokenIdentifier(containerId, "host", "user",
+            Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
+            Priority.newInstance(7), 13579);
+
+    stateStore
+        .storeContainerUpdateToken(containerId, updateTokenIdentifier);
     restartStateStore();
     recoveredContainers = stateStore.loadContainersState();
     assertEquals(1, recoveredContainers.size());
     rcs = recoveredContainers.get(0);
-    assertEquals(2, rcs.getVersion());
+    assertEquals(0, rcs.getVersion());
     assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertEquals(false, rcs.getKilled());
@@ -330,7 +335,9 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertTrue(rcs.getKilled());
-    assertEquals(containerReq, rcs.getStartRequest());
+    ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
+        
.newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken());
+    assertEquals(updateTokenIdentifier, tokenReadFromRequest);
     assertEquals(diags.toString(), rcs.getDiagnostics());
 
     // add yet more diags, mark container completed, and verify recovered
@@ -344,7 +351,6 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
     assertEquals(21, rcs.getExitCode());
     assertTrue(rcs.getKilled());
-    assertEquals(containerReq, rcs.getStartRequest());
     assertEquals(diags.toString(), rcs.getDiagnostics());
 
     // store remainingRetryAttempts, workDir and logDir


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to