http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 73d9e5c..641ef64 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -34,11 +34,15 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -64,8 +68,11 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistrib
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -75,13 +82,17 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 
 /**
@@ -91,8 +102,10 @@ public class TestOpportunisticContainerAllocatorAMService {
 
   private static final int GB = 1024;
 
-  @Test(timeout = 60000)
-  public void testNodeRemovalDuringAllocate() throws Exception {
+  private MockRM rm;
+
+  @Before
+  public void createAndStartRM() {
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration();
     YarnConfiguration conf = new YarnConfiguration(csConf);
@@ -102,8 +115,445 @@ public class TestOpportunisticContainerAllocatorAMService 
{
         YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
     conf.setInt(
         YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
-    MockRM rm = new MockRM(conf);
+    rm = new MockRM(conf);
     rm.start();
+  }
+
+  @After
+  public void stopRM() {
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  @Test(timeout = 600000)
+  public void testContainerPromoteAndDemoteBeforeContainerStart() throws 
Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h1:4321", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h2:4321", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    OpportunisticContainerAllocatorAMService amservice =
+        (OpportunisticContainerAllocatorAMService) rm
+            .getApplicationMasterService();
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    ApplicationAttemptId attemptId =
+        app1.getCurrentAppAttempt().getAppAttemptId();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+    RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
+    RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    nm4.nodeHeartbeat(true);
+
+    ((RMNodeImpl) rmNode1)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    ((RMNodeImpl) rmNode2)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    ((RMNodeImpl) rmNode3)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    ((RMNodeImpl) rmNode4)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+    // Send add and update node events to AM Service.
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode3));
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode4));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode3));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode4));
+    // All nodes 1 - 4 will be applicable for scheduling.
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    nm4.nodeHeartbeat(true);
+
+    Thread.sleep(1000);
+
+    QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
+        .getMetrics();
+
+    // Verify Metrics
+    verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
+
+    AllocateResponse allocateResponse = am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 2, true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true))),
+        null);
+    List<Container> allocatedContainers = allocateResponse
+        .getAllocatedContainers();
+    Assert.assertEquals(2, allocatedContainers.size());
+    Container container = allocatedContainers.get(0);
+    MockNM allocNode = nodes.get(container.getNodeId());
+    MockNM sameHostDiffNode = null;
+    for (NodeId n : nodes.keySet()) {
+      if (n.getHost().equals(allocNode.getNodeId().getHost()) &&
+          n.getPort() != allocNode.getNodeId().getPort()) {
+        sameHostDiffNode = nodes.get(n);
+      }
+    }
+
+    // Verify Metrics After OPP allocation (Nothing should change)
+    verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
+
+    am1.sendContainerUpdateRequest(
+        Arrays.asList(UpdateContainerRequest.newInstance(0,
+            container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED)));
+    // Node on same host should not result in allocation
+    sameHostDiffNode.nodeHeartbeat(true);
+    Thread.sleep(200);
+    allocateResponse =  am1.allocate(new ArrayList<>(), new ArrayList<>());
+    Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+
+    // Verify Metrics After OPP allocation (Nothing should change again)
+    verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
+
+    // Send Promotion req again... this should result in update error
+    allocateResponse = am1.sendContainerUpdateRequest(
+        Arrays.asList(UpdateContainerRequest.newInstance(0,
+            container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED)));
+    Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+    Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
+    Assert.assertEquals("UPDATE_OUTSTANDING_ERROR",
+        allocateResponse.getUpdateErrors().get(0).getReason());
+    Assert.assertEquals(container.getId(),
+        allocateResponse.getUpdateErrors().get(0)
+            .getUpdateContainerRequest().getContainerId());
+
+    // Send Promotion req again with incorrect version...
+    // this should also result in update error
+    allocateResponse = am1.sendContainerUpdateRequest(
+        Arrays.asList(UpdateContainerRequest.newInstance(1,
+            container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED)));
+
+    Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+    Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
+    Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0",
+        allocateResponse.getUpdateErrors().get(0).getReason());
+    Assert.assertEquals(container.getId(),
+        allocateResponse.getUpdateErrors().get(0)
+            .getUpdateContainerRequest().getContainerId());
+
+    // Ensure after correct node heartbeats, we should get the allocation
+    allocNode.nodeHeartbeat(true);
+    Thread.sleep(200);
+    allocateResponse =  am1.allocate(new ArrayList<>(), new ArrayList<>());
+    Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+    Container uc =
+        allocateResponse.getUpdatedContainers().get(0).getContainer();
+    Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType());
+    Assert.assertEquals(uc.getId(), container.getId());
+    Assert.assertEquals(uc.getVersion(), container.getVersion() + 1);
+
+    // Verify Metrics After OPP allocation :
+    // Allocated cores+mem should have increased, available should decrease
+    verifyMetrics(metrics, 14336, 14, 2048, 2, 2);
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    nm4.nodeHeartbeat(true);
+    Thread.sleep(200);
+
+    // Verify that the container is still in ACQUIRED state wrt the RM.
+    RMContainer rmContainer = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(
+        uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId());
+    Assert.assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
+
+    // Now demote the container back..
+    allocateResponse = am1.sendContainerUpdateRequest(
+        Arrays.asList(UpdateContainerRequest.newInstance(uc.getVersion(),
+            uc.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+            null, ExecutionType.OPPORTUNISTIC)));
+    // This should happen in the same heartbeat..
+    Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+    uc = allocateResponse.getUpdatedContainers().get(0).getContainer();
+    Assert.assertEquals(ExecutionType.OPPORTUNISTIC, uc.getExecutionType());
+    Assert.assertEquals(uc.getId(), container.getId());
+    Assert.assertEquals(uc.getVersion(), container.getVersion() + 2);
+
+    // Verify Metrics After OPP allocation :
+    // Everything should have reverted to what it was
+    verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
+  }
+
+  @Test(timeout = 60000)
+  public void testContainerPromoteAfterContainerStart() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    nm1.registerNode();
+    nm2.registerNode();
+
+    OpportunisticContainerAllocatorAMService amservice =
+        (OpportunisticContainerAllocatorAMService) rm
+            .getApplicationMasterService();
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    ApplicationAttemptId attemptId =
+        app1.getCurrentAppAttempt().getAppAttemptId();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    ((RMNodeImpl) rmNode1)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    ((RMNodeImpl) rmNode2)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+    // Send add and update node events to AM Service.
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // All nodes 1 to 2 will be applicable for scheduling.
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    Thread.sleep(1000);
+
+    QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
+        .getMetrics();
+
+    // Verify Metrics
+    verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+    AllocateResponse allocateResponse = am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 2, true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true))),
+        null);
+    List<Container> allocatedContainers = allocateResponse
+        .getAllocatedContainers();
+    Assert.assertEquals(2, allocatedContainers.size());
+    Container container = allocatedContainers.get(0);
+    MockNM allocNode = nodes.get(container.getNodeId());
+
+    // Start Container in NM
+    allocNode.nodeHeartbeat(Arrays.asList(
+        ContainerStatus.newInstance(container.getId(),
+            ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
+        true);
+    Thread.sleep(200);
+
+    // Verify that container is actually running wrt the RM..
+    RMContainer rmContainer = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(
+            container.getId().getApplicationAttemptId()).getRMContainer(
+            container.getId());
+    Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+    // Verify Metrics After OPP allocation (Nothing should change)
+    verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+    am1.sendContainerUpdateRequest(
+        Arrays.asList(UpdateContainerRequest.newInstance(0,
+            container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED)));
+
+    // Verify Metrics After OPP allocation (Nothing should change again)
+    verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+    // Send Promotion req again... this should result in update error
+    allocateResponse = am1.sendContainerUpdateRequest(
+        Arrays.asList(UpdateContainerRequest.newInstance(0,
+            container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED)));
+    Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+    Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
+    Assert.assertEquals("UPDATE_OUTSTANDING_ERROR",
+        allocateResponse.getUpdateErrors().get(0).getReason());
+    Assert.assertEquals(container.getId(),
+        allocateResponse.getUpdateErrors().get(0)
+            .getUpdateContainerRequest().getContainerId());
+
+    // Start Container in NM
+    allocNode.nodeHeartbeat(Arrays.asList(
+        ContainerStatus.newInstance(container.getId(),
+            ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
+        true);
+    Thread.sleep(200);
+
+    allocateResponse =  am1.allocate(new ArrayList<>(), new ArrayList<>());
+    Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+    Container uc =
+        allocateResponse.getUpdatedContainers().get(0).getContainer();
+    Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType());
+    Assert.assertEquals(uc.getId(), container.getId());
+    Assert.assertEquals(uc.getVersion(), container.getVersion() + 1);
+
+    // Verify that the Container is still in RUNNING state wrt RM..
+    rmContainer = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(
+            uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId());
+    Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+    // Verify Metrics After OPP allocation :
+    // Allocated cores+mem should have increased, available should decrease
+    verifyMetrics(metrics, 6144, 6, 2048, 2, 2);
+  }
+
+  @Test(timeout = 600000)
+  public void testContainerPromoteAfterContainerComplete() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    nm1.registerNode();
+    nm2.registerNode();
+
+    OpportunisticContainerAllocatorAMService amservice =
+        (OpportunisticContainerAllocatorAMService) rm
+            .getApplicationMasterService();
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    ApplicationAttemptId attemptId =
+        app1.getCurrentAppAttempt().getAppAttemptId();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    ((RMNodeImpl) rmNode1)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    ((RMNodeImpl) rmNode2)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+    // Send add and update node events to AM Service.
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // All nodes 1 to 2 will be applicable for scheduling.
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    Thread.sleep(1000);
+
+    QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
+        .getMetrics();
+
+    // Verify Metrics
+    verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+    AllocateResponse allocateResponse = am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 2, true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true))),
+        null);
+    List<Container> allocatedContainers = allocateResponse
+        .getAllocatedContainers();
+    Assert.assertEquals(2, allocatedContainers.size());
+    Container container = allocatedContainers.get(0);
+    MockNM allocNode = nodes.get(container.getNodeId());
+
+    // Start Container in NM
+    allocNode.nodeHeartbeat(Arrays.asList(
+        ContainerStatus.newInstance(container.getId(),
+            ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
+        true);
+    Thread.sleep(200);
+
+    // Verify that container is actually running wrt the RM..
+    RMContainer rmContainer = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(
+            container.getId().getApplicationAttemptId()).getRMContainer(
+            container.getId());
+    Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+    // Container Completed in the NM
+    allocNode.nodeHeartbeat(Arrays.asList(
+        ContainerStatus.newInstance(container.getId(),
+            ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)),
+        true);
+    Thread.sleep(200);
+
+    // Verify that container has been removed..
+    rmContainer = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(
+            container.getId().getApplicationAttemptId()).getRMContainer(
+            container.getId());
+    Assert.assertNull(rmContainer);
+
+    // Verify Metrics After OPP allocation (Nothing should change)
+    verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+    // Send Promotion req... this should result in update error
+    // Since the container doesn't exist anymore..
+    allocateResponse = am1.sendContainerUpdateRequest(
+        Arrays.asList(UpdateContainerRequest.newInstance(0,
+            container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED)));
+
+    Assert.assertEquals(1,
+        allocateResponse.getCompletedContainersStatuses().size());
+    Assert.assertEquals(container.getId(),
+        allocateResponse.getCompletedContainersStatuses().get(0)
+            .getContainerId());
+    Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+    Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
+    Assert.assertEquals("INVALID_CONTAINER_ID",
+        allocateResponse.getUpdateErrors().get(0).getReason());
+    Assert.assertEquals(container.getId(),
+        allocateResponse.getUpdateErrors().get(0)
+            .getUpdateContainerRequest().getContainerId());
+
+    // Verify Metrics After OPP allocation (Nothing should change again)
+    verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+  }
+
+  private void verifyMetrics(QueueMetrics metrics, long availableMB,
+      int availableVirtualCores, long allocatedMB,
+      int allocatedVirtualCores, int allocatedContainers) {
+    Assert.assertEquals(availableMB, metrics.getAvailableMB());
+    Assert.assertEquals(availableVirtualCores, 
metrics.getAvailableVirtualCores());
+    Assert.assertEquals(allocatedMB, metrics.getAllocatedMB());
+    Assert.assertEquals(allocatedVirtualCores, 
metrics.getAllocatedVirtualCores());
+    Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers());
+  }
+
+  @Test(timeout = 60000)
+  public void testNodeRemovalDuringAllocate() throws Exception {
     MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
     MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
     nm1.registerNode();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 497c6d0..786cc50 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -100,6 +100,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -478,7 +479,7 @@ public class TestRMAppAttemptTransitions {
     assertEquals(expectedState, applicationAttempt.getAppAttemptState());
     verify(scheduler, times(expectedAllocateCount)).allocate(
         any(ApplicationAttemptId.class), any(List.class), any(List.class),
-        any(List.class), any(List.class), any(List.class), any(List.class));
+        any(List.class), any(List.class), any(ContainerUpdates.class));
 
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertNull(applicationAttempt.getMasterContainer());
@@ -499,7 +500,7 @@ public class TestRMAppAttemptTransitions {
     verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
     verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class),
         any(List.class), any(List.class), any(List.class), any(List.class),
-        any(List.class), any(List.class));
+        any(ContainerUpdates.class));
     verify(nmTokenManager).clearNodeSetForAttempt(
       applicationAttempt.getAppAttemptId());
   }
@@ -526,7 +527,7 @@ public class TestRMAppAttemptTransitions {
   }
 
   /**
-   * {@link RMAppAttemptState#LAUNCH}
+   * {@link RMAppAttemptState#LAUNCHED}
    */
   private void testAppAttemptLaunchedState(Container container) {
     assertEquals(RMAppAttemptState.LAUNCHED, 
@@ -649,8 +650,8 @@ public class TestRMAppAttemptTransitions {
     when(allocation.getContainers()).
         thenReturn(Collections.singletonList(container));
     when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
-        any(List.class), any(List.class), any(List.class), any(List.class),
-        any(List.class))).
+        any(List.class), any(List.class), any(List.class),
+        any(ContainerUpdates.class))).
     thenReturn(allocation);
     RMContainer rmContainer = mock(RMContainerImpl.class);
     when(scheduler.getRMContainer(container.getId())).
@@ -1129,8 +1130,9 @@ public class TestRMAppAttemptTransitions {
     when(allocation.getContainers()).
         thenReturn(Collections.singletonList(amContainer));
     when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
-        any(List.class), any(List.class), any(List.class), any(List.class),
-        any(List.class))).thenReturn(allocation);
+        any(List.class), any(List.class), any(List.class),
+        any(ContainerUpdates.class)))
+        .thenReturn(allocation);
     RMContainer rmContainer = mock(RMContainerImpl.class);
     
when(scheduler.getRMContainer(amContainer.getId())).thenReturn(rmContainer);
 
@@ -1610,7 +1612,8 @@ public class TestRMAppAttemptTransitions {
     YarnScheduler mockScheduler = mock(YarnScheduler.class);
     when(mockScheduler.allocate(any(ApplicationAttemptId.class),
         any(List.class), any(List.class), any(List.class), any(List.class),
-        any(List.class), any(List.class))).thenAnswer(new Answer<Allocation>() 
{
+        any(ContainerUpdates.class)))
+        .thenAnswer(new Answer<Allocation>() {
 
           @SuppressWarnings("rawtypes")
           @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index e737a84..893f802 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -62,6 +62,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -114,7 +115,8 @@ public class TestRMContainerImpl {
         YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
         true);
     when(rmContext.getYarnConfiguration()).thenReturn(conf);
-    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+    RMContainer rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container), appAttemptId,
         nodeId, "user", rmContext);
 
     assertEquals(RMContainerState.NEW, rmContainer.getState());
@@ -216,7 +218,8 @@ public class TestRMContainerImpl {
     when(rmContext.getYarnConfiguration()).thenReturn(conf);
     when(rmContext.getRMApps()).thenReturn(appMap);
     
-    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+    RMContainer rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container), appAttemptId,
         nodeId, "user", rmContext);
 
     assertEquals(RMContainerState.NEW, rmContainer.getState());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 27860a6..43bdc8e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -114,6 +114,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.
     ContainerExpiredSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -160,7 +161,9 @@ import com.google.common.collect.Sets;
 public class TestCapacityScheduler {
   private static final Log LOG = 
LogFactory.getLog(TestCapacityScheduler.class);
   private final int GB = 1024;
-  
+  private final static ContainerUpdates NULL_UPDATE_REQUESTS =
+      new ContainerUpdates();
+
   private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
   private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
   private static final String A1 = A + ".a1";
@@ -738,12 +741,12 @@ public class TestCapacityScheduler {
     // Verify the blacklist can be updated independent of requesting containers
     cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
         Collections.<ContainerId>emptyList(),
-        Collections.singletonList(host), null, null, null);
+        Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
     Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
         .isPlaceBlacklisted(host));
     cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
         Collections.<ContainerId>emptyList(), null,
-        Collections.singletonList(host), null, null);
+        Collections.singletonList(host), NULL_UPDATE_REQUESTS);
     Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
         .isPlaceBlacklisted(host));
     rm.stop();
@@ -839,7 +842,7 @@ public class TestCapacityScheduler {
     cs.allocate(appAttemptId1,
         Collections.<ResourceRequest>singletonList(r1),
         Collections.<ContainerId>emptyList(),
-        null, null, null, null);
+        null, null, NULL_UPDATE_REQUESTS);
 
     //And this will result in container assignment for app1
     CapacityScheduler.schedule(cs);
@@ -856,7 +859,7 @@ public class TestCapacityScheduler {
     cs.allocate(appAttemptId2,
         Collections.<ResourceRequest>singletonList(r2),
         Collections.<ContainerId>emptyList(),
-        null, null, null, null);
+        null, null, NULL_UPDATE_REQUESTS);
 
     //In this case we do not perform container assignment because we want to
     //verify re-ordering based on the allocation alone
@@ -2981,7 +2984,8 @@ public class TestCapacityScheduler {
 
     Allocation allocate =
         cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
-            Collections.<ContainerId> emptyList(), null, null, null, null);
+            Collections.<ContainerId> emptyList(), null, null,
+            NULL_UPDATE_REQUESTS);
 
     Assert.assertNotNull(attempt);
 
@@ -2997,7 +3001,8 @@ public class TestCapacityScheduler {
 
     allocate =
         cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
-            Collections.<ContainerId> emptyList(), null, null, null, null);
+            Collections.<ContainerId> emptyList(), null, null,
+            NULL_UPDATE_REQUESTS);
 
     // All resources should be sent as headroom
     Assert.assertEquals(newResource, allocate.getResourceLimit());
@@ -3504,7 +3509,7 @@ public class TestCapacityScheduler {
       cs.allocate(appAttemptId3,
           Collections.<ResourceRequest>singletonList(y1Req),
           Collections.<ContainerId>emptyList(),
-          null, null, null, null);
+          null, null, NULL_UPDATE_REQUESTS);
       CapacityScheduler.schedule(cs);
     }
     assertEquals("Y1 Used Resource should be 4 GB", 4 * GB,
@@ -3518,7 +3523,7 @@ public class TestCapacityScheduler {
       cs.allocate(appAttemptId1,
           Collections.<ResourceRequest>singletonList(x1Req),
           Collections.<ContainerId>emptyList(),
-          null, null, null, null);
+          null, null, NULL_UPDATE_REQUESTS);
       CapacityScheduler.schedule(cs);
     }
     assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
@@ -3531,7 +3536,7 @@ public class TestCapacityScheduler {
     cs.allocate(appAttemptId2,
         Collections.<ResourceRequest>singletonList(x2Req),
         Collections.<ContainerId>emptyList(),
-        null, null, null, null);
+        null, null, NULL_UPDATE_REQUESTS);
     CapacityScheduler.schedule(cs);
     assertEquals("X2 Used Resource should be 0", 0,
         cs.getQueue("x2").getUsedResources().getMemorySize());
@@ -3543,7 +3548,7 @@ public class TestCapacityScheduler {
     cs.allocate(appAttemptId1,
         Collections.<ResourceRequest>singletonList(x1Req),
         Collections.<ContainerId>emptyList(),
-        null, null, null, null);
+        null, null, NULL_UPDATE_REQUESTS);
     CapacityScheduler.schedule(cs);
     assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
         cs.getQueue("x1").getUsedResources().getMemorySize());
@@ -3557,7 +3562,7 @@ public class TestCapacityScheduler {
       cs.allocate(appAttemptId3,
           Collections.<ResourceRequest>singletonList(y1Req),
           Collections.<ContainerId>emptyList(),
-          null, null, null, null);
+          null, null, NULL_UPDATE_REQUESTS);
       CapacityScheduler.schedule(cs);
     }
     assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
@@ -3616,7 +3621,7 @@ public class TestCapacityScheduler {
     //This will allocate for app1
     cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
         Collections.<ContainerId>emptyList(),
-        null, null, null, null).getContainers().size();
+        null, null, NULL_UPDATE_REQUESTS).getContainers().size();
     CapacityScheduler.schedule(cs);
     ResourceRequest r2 = null;
     for (int i =0; i < 13; i++) {
@@ -3625,7 +3630,7 @@ public class TestCapacityScheduler {
       cs.allocate(appAttemptId2,
           Collections.<ResourceRequest>singletonList(r2),
           Collections.<ContainerId>emptyList(),
-          null, null, null, null);
+          null, null, NULL_UPDATE_REQUESTS);
       CapacityScheduler.schedule(cs);
     }
     assertEquals("A Used Resource should be 2 GB", 2 * GB,
@@ -3638,11 +3643,11 @@ public class TestCapacityScheduler {
         ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
     cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
         Collections.<ContainerId>emptyList(),
-        null, null, null, null).getContainers().size();
+        null, null, NULL_UPDATE_REQUESTS).getContainers().size();
     CapacityScheduler.schedule(cs);
 
     cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2),
-        Collections.<ContainerId>emptyList(), null, null, null, null);
+        Collections.<ContainerId>emptyList(), null, null, 
NULL_UPDATE_REQUESTS);
     CapacityScheduler.schedule(cs);
     //Check blocked Resource
     assertEquals("A Used Resource should be 2 GB", 2 * GB,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index a6ae0c2..cf91841 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -55,6 +55,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -280,7 +282,8 @@ public class TestChildQueueOrder {
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
     Container container=TestUtils.getMockContainer(containerId, 
         node_0.getNodeID(), Resources.createResource(1*GB), priority);
-    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+    RMContainer rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container), appAttemptId,
         node_0.getNodeID(), "user", rmContext);
 
     // Assign {1,2,3,4} 1GB containers respectively to queues

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index c9eb8b3..f9bf89d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -923,13 +923,15 @@ public class TestReservations {
     Container container = TestUtils.getMockContainer(containerId,
         node_1.getNodeID(), Resources.createResource(2*GB),
         priorityMap.getPriority());
-    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+    RMContainer rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container), appAttemptId,
         node_1.getNodeID(), "user", rmContext);
 
     Container container_1 = TestUtils.getMockContainer(containerId,
         node_0.getNodeID(), Resources.createResource(1*GB),
         priorityMap.getPriority());
-    RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId,
+    RMContainer rmContainer_1 = new RMContainerImpl(container_1,
+        SchedulerRequestKey.extractFrom(container_1), appAttemptId,
         node_0.getNodeID(), "user", rmContext);
 
     // no reserved containers
@@ -996,7 +998,8 @@ public class TestReservations {
     Container container = TestUtils.getMockContainer(containerId,
         node_1.getNodeID(), Resources.createResource(2*GB),
         priorityMap.getPriority());
-    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+    RMContainer rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container), appAttemptId,
         node_1.getNodeID(), "user", rmContext);
 
     // nothing reserved

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 992b75d..8e8267f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -41,6 +41,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -71,6 +74,8 @@ public class FairSchedulerTestBase {
   public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
   private static final int SLEEP_DURATION = 10;
   private static final int SLEEP_RETRIES = 1000;
+  final static ContainerUpdates NULL_UPDATE_REQUESTS =
+      new ContainerUpdates();
 
   /**
    * The list of nodes added to the cluster using the {@link #addNode} method.
@@ -181,7 +186,8 @@ public class FairSchedulerTestBase {
     resourceManager.getRMContext().getRMApps()
         .put(id.getApplicationId(), rmApp);
 
-    scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null, 
null, null);
+    scheduler.allocate(id, ask, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
     return id;
   }
   
@@ -207,7 +213,8 @@ public class FairSchedulerTestBase {
     resourceManager.getRMContext().getRMApps()
         .put(id.getApplicationId(), rmApp);
 
-    scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null, 
null, null);
+    scheduler.allocate(id, ask, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
     return id;
   }
 
@@ -229,7 +236,8 @@ public class FairSchedulerTestBase {
       ResourceRequest request, ApplicationAttemptId attId) {
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ask.add(request);
-    scheduler.allocate(attId, ask,  new ArrayList<ContainerId>(), null, null, 
null, null);
+    scheduler.allocate(attId, ask,  new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
   }
 
   protected void createApplicationWithAMResource(ApplicationAttemptId attId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
index 4cc99b2..8bb06e7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
@@ -119,7 +119,8 @@ public class TestContinuousScheduling extends 
FairSchedulerTestBase {
     List<ResourceRequest> ask = new ArrayList<>();
     ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true));
     scheduler.allocate(
-        appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, 
null);
+        appAttemptId, ask, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
     FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
 
     triggerSchedulingAttempt();
@@ -157,7 +158,7 @@ public class TestContinuousScheduling extends 
FairSchedulerTestBase {
         createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
     ask.add(request);
     scheduler.allocate(appAttemptId, ask,
-        new ArrayList<ContainerId>(), null, null, null, null);
+        new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
     triggerSchedulingAttempt();
 
     FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
@@ -169,7 +170,7 @@ public class TestContinuousScheduling extends 
FairSchedulerTestBase {
     ask.clear();
     ask.add(request);
     scheduler.allocate(appAttemptId, ask,
-        new ArrayList<ContainerId>(), null, null, null, null);
+        new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
     triggerSchedulingAttempt();
 
     checkAppConsumption(app, Resources.createResource(2048,2));
@@ -335,7 +336,7 @@ public class TestContinuousScheduling extends 
FairSchedulerTestBase {
     ask1.add(request1);
     ask1.add(request2);
     scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
-        null, null);
+        NULL_UPDATE_REQUESTS);
 
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 5aa1e2d..b1e412b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -93,6 +93,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -124,6 +127,8 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
   private final int GB = 1024;
   private final static String ALLOC_FILE =
       new File(TEST_DIR, "test-queues").getAbsolutePath();
+  private final static ContainerUpdates NULL_UPDATE_REQUESTS =
+      new ContainerUpdates();
 
   @Before
   public void setUp() throws IOException {
@@ -1257,7 +1262,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
 
     scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
-            null, null, null);
+            null, NULL_UPDATE_REQUESTS);
 
     ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", 
"user1", 1);
     scheduler.update();
@@ -2111,7 +2116,8 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     ResourceRequest request1 =
         createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true);
     ask1.add(request1);
-    scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null, 
null, null);
+    scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
 
     // Second ask, queue2 requests 1 large.
     List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
@@ -2121,7 +2127,8 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
             ResourceRequest.ANY, 1, 1, false);
     ask2.add(request2);
     ask2.add(request3);
-    scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(), null, null, 
null, null);
+    scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
 
     // Third ask, queue2 requests 2 small (minReqSize).
     List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
@@ -2131,7 +2138,8 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
             ResourceRequest.ANY, 2, 2, true);
     ask3.add(request4);
     ask3.add(request5);
-    scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(), null, null, 
null, null);
+    scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
 
     scheduler.update();
 
@@ -2665,7 +2673,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     
     // Complete container
     scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
-        Arrays.asList(containerId), null, null, null, null);
+        Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS);
     assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
     assertEquals(4, 
scheduler.getRootQueueMetrics().getAvailableVirtualCores());
     
@@ -2757,7 +2765,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
 
     scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
-        null, null, null);
+        null, NULL_UPDATE_REQUESTS);
     
     // node 1 checks in
     scheduler.update();
@@ -3203,7 +3211,8 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
         createResourceRequest(1024, node1.getHostName(), 1, 0, true),
         createResourceRequest(1024, "rack1", 1, 0, true),
         createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
-    scheduler.allocate(attId1, update, new ArrayList<ContainerId>(), null, 
null, null, null);
+    scheduler.allocate(attId1, update, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
     
     // then node2 should get the container
     scheduler.handle(node2UpdateEvent);
@@ -3250,7 +3259,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
         1, 1, false);
     scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
-        new ArrayList<ContainerId>(), null, null, null, null);
+        new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
 
     scheduler.handle(nodeUpdateEvent);
     assertEquals(0, app.getReservedContainers().size());
@@ -4275,7 +4284,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
 
     ask1.add(request1);
     scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null,
-        null, null, null);
+        null, NULL_UPDATE_REQUESTS);
 
     String hostName = "127.0.0.1";
     RMNode node1 = MockNodes.newNodeInfo(1,
@@ -4351,11 +4360,11 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     // Verify the blacklist can be updated independent of requesting containers
     scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
         Collections.<ContainerId>emptyList(),
-        Collections.singletonList(host), null, null, null);
+        Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
     assertTrue(app.isPlaceBlacklisted(host));
     scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
         Collections.<ContainerId>emptyList(), null,
-        Collections.singletonList(host), null, null);
+        Collections.singletonList(host), NULL_UPDATE_REQUESTS);
     assertFalse(scheduler.getSchedulerApp(appAttemptId)
         .isPlaceBlacklisted(host));
 
@@ -4365,7 +4374,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     // Verify a container does not actually get placed on the blacklisted host
     scheduler.allocate(appAttemptId, update,
         Collections.<ContainerId>emptyList(),
-        Collections.singletonList(host), null, null, null);
+        Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
     assertTrue(app.isPlaceBlacklisted(host));
     scheduler.update();
     scheduler.handle(updateEvent);
@@ -4375,7 +4384,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     // Verify a container gets placed on the empty blacklist
     scheduler.allocate(appAttemptId, update,
         Collections.<ContainerId>emptyList(), null,
-        Collections.singletonList(host), null, null);
+        Collections.singletonList(host), NULL_UPDATE_REQUESTS);
     assertFalse(app.isPlaceBlacklisted(host));
     createSchedulingRequest(GB, "root.default", "user", 1);
     scheduler.update();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index bfbc7bd..3f97b59 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -83,6 +83,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
@@ -119,7 +120,10 @@ public class TestFifoScheduler {
   private static Configuration conf;
   private static final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
-  
+
+  private final static ContainerUpdates NULL_UPDATE_REQUESTS =
+      new ContainerUpdates();
+
   @Before
   public void setUp() throws Exception {
     conf = new Configuration();
@@ -274,7 +278,8 @@ public class TestFifoScheduler {
     ask.add(nodeLocal);
     ask.add(rackLocal);
     ask.add(any);
-    scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, 
null, null, null);
+    scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
 
     NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
 
@@ -368,7 +373,8 @@ public class TestFifoScheduler {
     ask.add(nodeLocal);
     ask.add(rackLocal);
     ask.add(any);
-    scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, 
null, null, null);
+    scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
 
     // Before the node update event, there are one local request
     Assert.assertEquals(1, nodeLocal.getNumContainers());
@@ -944,7 +950,7 @@ public class TestFifoScheduler {
         ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1,
         RMNodeLabelsManager.NO_LABEL));
     fs.allocate(appAttemptId1, ask1, emptyId,
-        Collections.singletonList(host_1_0), null, null, null);
+        Collections.singletonList(host_1_0), null, NULL_UPDATE_REQUESTS);
 
     // Trigger container assignment
     fs.handle(new NodeUpdateSchedulerEvent(n3));
@@ -952,14 +958,16 @@ public class TestFifoScheduler {
     // Get the allocation for the application and verify no allocation on
     // blacklist node
     Allocation allocation1 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
+        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+            null, null, NULL_UPDATE_REQUESTS);
 
     Assert.assertEquals("allocation1", 0, allocation1.getContainers().size());
 
     // verify host_1_1 can get allocated as not in blacklist
     fs.handle(new NodeUpdateSchedulerEvent(n4));
     Allocation allocation2 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
+        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+            null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation2", 1, allocation2.getContainers().size());
     List<Container> containerList = allocation2.getContainers();
     for (Container container : containerList) {
@@ -974,29 +982,33 @@ public class TestFifoScheduler {
     ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
         ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
     fs.allocate(appAttemptId1, ask2, emptyId,
-        Collections.singletonList("rack0"), null, null, null);
+        Collections.singletonList("rack0"), null, NULL_UPDATE_REQUESTS);
 
     // verify n1 is not qualified to be allocated
     fs.handle(new NodeUpdateSchedulerEvent(n1));
     Allocation allocation3 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
+        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+            null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation3", 0, allocation3.getContainers().size());
 
     // verify n2 is not qualified to be allocated
     fs.handle(new NodeUpdateSchedulerEvent(n2));
     Allocation allocation4 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
+        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+            null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation4", 0, allocation4.getContainers().size());
 
     // verify n3 is not qualified to be allocated
     fs.handle(new NodeUpdateSchedulerEvent(n3));
     Allocation allocation5 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
+        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+            null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation5", 0, allocation5.getContainers().size());
 
     fs.handle(new NodeUpdateSchedulerEvent(n4));
     Allocation allocation6 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
+        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+            null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation6", 1, allocation6.getContainers().size());
 
     containerList = allocation6.getContainers();
@@ -1055,25 +1067,29 @@ public class TestFifoScheduler {
     List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
     ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
         ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
-    fs.allocate(appAttemptId1, ask1, emptyId, null, null, null, null);
+    fs.allocate(appAttemptId1, ask1, emptyId,
+        null, null, NULL_UPDATE_REQUESTS);
 
     // Ask for a 2 GB container for app 2
     List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
     ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
         ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1));
-    fs.allocate(appAttemptId2, ask2, emptyId, null, null, null, null);
+    fs.allocate(appAttemptId2, ask2, emptyId,
+        null, null, NULL_UPDATE_REQUESTS);
 
     // Trigger container assignment
     fs.handle(new NodeUpdateSchedulerEvent(n1));
 
     // Get the allocation for the applications and verify headroom
     Allocation allocation1 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
+        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+            null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("Allocation headroom", 1 * GB, allocation1
         .getResourceLimit().getMemorySize());
 
     Allocation allocation2 =
-        fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null, null, null);
+        fs.allocate(appAttemptId2, emptyAsk, emptyId,
+            null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("Allocation headroom", 1 * GB, allocation2
         .getResourceLimit().getMemorySize());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to