YARN-5959. RM changes to support change of container ExecutionType. (Arun 
Suresh via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a55bd84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a55bd84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a55bd84

Branch: refs/heads/HADOOP-13345
Commit: 0a55bd841ec0f2eb89a0383f4c589526e8b138d4
Parents: a605ff3
Author: Wangda Tan <wan...@apache.org>
Authored: Thu Jan 5 10:31:05 2017 -0800
Committer: Wangda Tan <wan...@apache.org>
Committed: Thu Jan 5 10:31:05 2017 -0800

----------------------------------------------------------------------
 .../v2/app/rm/TestRMContainerAllocator.java     |  12 +-
 .../sls/scheduler/ResourceSchedulerWrapper.java |   9 +-
 .../sls/scheduler/SLSCapacityScheduler.java     |   9 +-
 .../yarn/api/records/ContainerUpdateType.java   |   9 +-
 .../yarn/api/records/UpdateContainerError.java  |   6 +
 .../api/records/UpdateContainerRequest.java     |  10 +
 .../src/main/proto/yarn_service_protos.proto    |   3 +-
 .../api/impl/TestAMRMClientOnRMRestart.java     |  11 +-
 .../OpportunisticContainerAllocator.java        |  16 +-
 .../OpportunisticContainerContext.java          |   8 +-
 .../server/scheduler/SchedulerRequestKey.java   |  51 ++-
 .../ApplicationMasterService.java               |  53 ++-
 ...pportunisticContainerAllocatorAMService.java |  18 +-
 .../server/resourcemanager/RMServerUtils.java   | 142 +++---
 .../rmapp/attempt/RMAppAttemptImpl.java         |   6 +-
 .../rmcontainer/RMContainerImpl.java            |  32 +-
 .../scheduler/AbstractYarnScheduler.java        | 104 ++++-
 .../resourcemanager/scheduler/Allocation.java   |  26 +-
 .../scheduler/AppSchedulingInfo.java            |  70 +--
 .../scheduler/ContainerUpdateContext.java       | 267 +++++++++++
 .../scheduler/ContainerUpdates.java             |  68 +++
 .../scheduler/SchedulerApplicationAttempt.java  | 170 ++++++-
 .../scheduler/SchedulerUtils.java               |  22 +
 .../scheduler/YarnScheduler.java                |   9 +-
 .../scheduler/capacity/CapacityScheduler.java   |  16 +-
 .../allocator/RegularContainerAllocator.java    |   2 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java |  13 +-
 .../scheduler/fair/FSAppAttempt.java            |   4 +-
 .../scheduler/fair/FairScheduler.java           |  41 +-
 .../scheduler/fifo/FifoAppAttempt.java          |   4 +-
 .../scheduler/fifo/FifoScheduler.java           |   5 +-
 .../LocalitySchedulingPlacementSet.java         |  46 +-
 .../placement/SchedulingPlacementSet.java       |   5 +-
 .../server/resourcemanager/Application.java     |   4 +-
 .../yarn/server/resourcemanager/MockAM.java     |   7 +
 .../yarn/server/resourcemanager/MockNM.java     |   6 +
 .../resourcemanager/TestClientRMService.java    |   5 +-
 ...pportunisticContainerAllocatorAMService.java | 456 ++++++++++++++++++-
 .../attempt/TestRMAppAttemptTransitions.java    |  19 +-
 .../rmcontainer/TestRMContainerImpl.java        |   7 +-
 .../capacity/TestCapacityScheduler.java         |  37 +-
 .../scheduler/capacity/TestChildQueueOrder.java |   5 +-
 .../scheduler/capacity/TestReservations.java    |   9 +-
 .../scheduler/fair/FairSchedulerTestBase.java   |  14 +-
 .../fair/TestContinuousScheduling.java          |   9 +-
 .../scheduler/fair/TestFairScheduler.java       |  35 +-
 .../scheduler/fifo/TestFifoScheduler.java       |  46 +-
 47 files changed, 1612 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index f9ee9cc..e6aee6e 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -110,7 +110,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -132,6 +131,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -1707,8 +1707,7 @@ public class TestRMContainerAllocator {
         ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
         List<ContainerId> release, List<String> blacklistAdditions,
         List<String> blacklistRemovals,
-        List<UpdateContainerRequest> increaseRequests,
-        List<UpdateContainerRequest> decreaseRequests) {
+        ContainerUpdates updateRequests) {
       List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
       for (ResourceRequest req : ask) {
         ResourceRequest reqCopy = ResourceRequest.newInstance(req
@@ -1723,7 +1722,7 @@ public class TestRMContainerAllocator {
       lastBlacklistRemovals = blacklistRemovals;
       Allocation allocation = super.allocate(
           applicationAttemptId, askCopy, release, blacklistAdditions,
-          blacklistRemovals, increaseRequests, decreaseRequests);
+          blacklistRemovals, updateRequests);
       if (forceResourceLimit != null) {
         // Test wants to force the non-default resource limit
         allocation.setResourceLimit(forceResourceLimit);
@@ -1754,8 +1753,7 @@ public class TestRMContainerAllocator {
         ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
         List<ContainerId> release, List<String> blacklistAdditions,
         List<String> blacklistRemovals,
-        List<UpdateContainerRequest> increaseRequest,
-        List<UpdateContainerRequest> decreaseRequests) {
+        ContainerUpdates updateRequests) {
       List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
       for (ResourceRequest req : ask) {
         ResourceRequest reqCopy = ResourceRequest.newInstance(req
@@ -1766,7 +1764,7 @@ public class TestRMContainerAllocator {
       SecurityUtil.setTokenServiceUseIp(false);
       Allocation normalAlloc = super.allocate(
           applicationAttemptId, askCopy, release,
-          blacklistAdditions, blacklistRemovals, null, null);
+          blacklistAdditions, blacklistRemovals, updateRequests);
       List<Container> containers = normalAlloc.getContainers();
       if(containers.size() > 0) {
         // allocate excess container

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 79f934c..e66de2f 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -68,6 +67,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
@@ -206,14 +206,13 @@ final public class ResourceSchedulerWrapper
   public Allocation allocate(ApplicationAttemptId attemptId,
       List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
       List<String> strings, List<String> strings2,
-      List<UpdateContainerRequest> increaseRequests,
-      List<UpdateContainerRequest> decreaseRequests) {
+      ContainerUpdates updateRequests) {
     if (metricsON) {
       final Timer.Context context = schedulerAllocateTimer.time();
       Allocation allocation = null;
       try {
         allocation = scheduler.allocate(attemptId, resourceRequests,
-                containerIds, strings, strings2, null, null);
+                containerIds, strings, strings2, updateRequests);
         return allocation;
       } finally {
         context.stop();
@@ -227,7 +226,7 @@ final public class ResourceSchedulerWrapper
       }
     } else {
       return scheduler.allocate(attemptId,
-              resourceRequests, containerIds, strings, strings2, null, null);
+              resourceRequests, containerIds, strings, strings2, 
updateRequests);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 89f9ad3..8388273 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -51,10 +51,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -182,15 +182,14 @@ public class SLSCapacityScheduler extends 
CapacityScheduler implements
   public Allocation allocate(ApplicationAttemptId attemptId,
       List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
       List<String> strings, List<String> strings2,
-      List<UpdateContainerRequest> increaseRequests,
-      List<UpdateContainerRequest> decreaseRequests) {
+      ContainerUpdates updateRequests) {
     if (metricsON) {
       final Timer.Context context = schedulerAllocateTimer.time();
       Allocation allocation = null;
       try {
         allocation = super
             .allocate(attemptId, resourceRequests, containerIds, strings,
-                strings2, increaseRequests, decreaseRequests);
+                strings2, updateRequests);
         return allocation;
       } finally {
         context.stop();
@@ -204,7 +203,7 @@ public class SLSCapacityScheduler extends CapacityScheduler 
implements
       }
     } else {
       return super.allocate(attemptId, resourceRequests, containerIds, strings,
-          strings2, increaseRequests, decreaseRequests);
+          strings2, updateRequests);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java
index 978ea09..6109cdb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java
@@ -39,7 +39,12 @@ public enum ContainerUpdateType {
   DECREASE_RESOURCE,
 
   /**
-   * Execution Type change.
+   * Execution Type promotion.
    */
-  UPDATE_EXECUTION_TYPE
+  PROMOTE_EXECUTION_TYPE,
+
+  /**
+   * Execution Type demotion.
+   */
+  DEMOTE_EXECUTION_TYPE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
index 7102f7b..e7458cf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
@@ -87,6 +87,12 @@ public abstract class UpdateContainerError {
   }
 
   @Override
+  public String toString() {
+    return "UpdateContainerError{reason=" + getReason() + ", "
+        + "req=" + getUpdateContainerRequest() + "}";
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (this == obj) {
       return true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
index e4f7a82..925a7979 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
@@ -150,11 +150,13 @@ public abstract class UpdateContainerRequest extends 
AbstractResourceRequest {
     ContainerId cId = getContainerId();
     ExecutionType execType = getExecutionType();
     Resource capability = getCapability();
+    ContainerUpdateType updateType = getContainerUpdateType();
     result =
         prime * result + ((capability == null) ? 0 : capability.hashCode());
     result = prime * result + ((cId == null) ? 0 : cId.hashCode());
     result = prime * result + getContainerVersion();
     result = prime * result + ((execType == null) ? 0 : execType.hashCode());
+    result = prime * result + ((updateType== null) ? 0 : 
updateType.hashCode());
     return result;
   }
 
@@ -208,6 +210,14 @@ public abstract class UpdateContainerRequest extends 
AbstractResourceRequest {
     } else if (!execType.equals(other.getExecutionType())) {
       return false;
     }
+    ContainerUpdateType updateType = getContainerUpdateType();
+    if (updateType == null) {
+      if (other.getContainerUpdateType() != null) {
+        return false;
+      }
+    } else if (!updateType.equals(other.getContainerUpdateType())) {
+      return false;
+    }
     return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index d9230d4..aed1580 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -63,7 +63,8 @@ message FinishApplicationMasterResponseProto {
 enum ContainerUpdateTypeProto {
   INCREASE_RESOURCE = 0;
   DECREASE_RESOURCE = 1;
-  UPDATE_EXECUTION_TYPE = 2;
+  PROMOTE_EXECUTION_TYPE = 2;
+  DEMOTE_EXECUTION_TYPE = 3;
 }
 
 message UpdateContainerRequestProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index f1c49f2..ac77446 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -66,6 +66,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@@ -574,8 +575,7 @@ public class TestAMRMClientOnRMRestart {
         ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
         List<ContainerId> release, List<String> blacklistAdditions,
         List<String> blacklistRemovals,
-        List<UpdateContainerRequest> increaseRequests,
-        List<UpdateContainerRequest> decreaseRequests) {
+        ContainerUpdates updateRequests) {
       List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
       for (ResourceRequest req : ask) {
         ResourceRequest reqCopy =
@@ -586,13 +586,12 @@ public class TestAMRMClientOnRMRestart {
       }
       lastAsk = ask;
       lastRelease = release;
-      lastIncrease = increaseRequests;
-      lastDecrease = decreaseRequests;
+      lastIncrease = updateRequests.getIncreaseRequests();
+      lastDecrease = updateRequests.getDecreaseRequests();
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
       return super.allocate(applicationAttemptId, askCopy, release,
-          blacklistAdditions, blacklistRemovals, increaseRequests,
-          decreaseRequests);
+          blacklistAdditions, blacklistRemovals, updateRequests);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index c1300b2..6fd5228 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -321,13 +321,21 @@ public class OpportunisticContainerAllocator {
     // before accepting an ask)
     Resource capability = normalizeCapability(appParams, rr);
 
+    return createContainer(
+        rmIdentifier, appParams.getContainerTokenExpiryInterval(),
+        SchedulerRequestKey.create(rr), userName, node, cId, capability);
+  }
+
+  private Container createContainer(long rmIdentifier, long tokenExpiry,
+      SchedulerRequestKey schedulerKey, String userName, RemoteNode node,
+      ContainerId cId, Resource capability) {
     long currTime = System.currentTimeMillis();
     ContainerTokenIdentifier containerTokenIdentifier =
         new ContainerTokenIdentifier(
             cId, 0, node.getNodeId().toString(), userName,
-            capability, currTime + appParams.containerTokenExpiryInterval,
+            capability, currTime + tokenExpiry,
             tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
-            rr.getPriority(), currTime,
+            schedulerKey.getPriority(), currTime,
             null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
             ExecutionType.OPPORTUNISTIC);
     byte[] pwd =
@@ -336,9 +344,9 @@ public class OpportunisticContainerAllocator {
         containerTokenIdentifier);
     Container container = BuilderUtils.newContainer(
         cId, node.getNodeId(), node.getHttpAddress(),
-        capability, rr.getPriority(), containerToken,
+        capability, schedulerKey.getPriority(), containerToken,
         containerTokenIdentifier.getExecutionType(),
-        rr.getAllocationRequestId());
+        schedulerKey.getAllocationRequestId());
     return container;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index a2f9f4d..1b1c5b9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -150,8 +150,9 @@ public class OpportunisticContainerContext {
             resourceRequest.getNumContainers() + request.getNumContainers());
       }
       if (ResourceRequest.isAnyLocation(request.getResourceName())) {
-        LOG.info("# of outstandingOpReqs in ANY (at" +
-            "priority = "+ schedulerKey.getPriority()
+        LOG.info("# of outstandingOpReqs in ANY (at "
+            + "priority = " + schedulerKey.getPriority()
+            + ", allocationReqId = " + schedulerKey.getAllocationRequestId()
             + ", with capability = " + request.getCapability() + " ) : "
             + resourceRequest.getNumContainers());
       }
@@ -167,7 +168,8 @@ public class OpportunisticContainerContext {
   public void matchAllocationToOutstandingRequest(Resource capability,
       List<Container> allocatedContainers) {
     for (Container c : allocatedContainers) {
-      SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c);
+      SchedulerRequestKey schedulerKey =
+          SchedulerRequestKey.extractFrom(c);
       Map<Resource, ResourceRequest> asks =
           outstandingOpReqs.get(schedulerKey);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index 9b7edbe..36a9149 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.yarn.server.scheduler;
 
+import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 
 /**
  * Composite key for outstanding scheduler requests for any schedulable entity.
@@ -31,6 +34,7 @@ public final class SchedulerRequestKey implements
 
   private final Priority priority;
   private final long allocationRequestId;
+  private final ContainerId containerToUpdate;
 
   /**
    * Factory method to generate a SchedulerRequestKey from a ResourceRequest.
@@ -39,7 +43,13 @@ public final class SchedulerRequestKey implements
    */
   public static SchedulerRequestKey create(ResourceRequest req) {
     return new SchedulerRequestKey(req.getPriority(),
-        req.getAllocationRequestId());
+        req.getAllocationRequestId(), null);
+  }
+
+  public static SchedulerRequestKey create(UpdateContainerRequest req,
+      SchedulerRequestKey schedulerRequestKey) {
+    return new SchedulerRequestKey(schedulerRequestKey.getPriority(),
+        schedulerRequestKey.getAllocationRequestId(), req.getContainerId());
   }
 
   /**
@@ -50,12 +60,16 @@ public final class SchedulerRequestKey implements
    */
   public static SchedulerRequestKey extractFrom(Container container) {
     return new SchedulerRequestKey(container.getPriority(),
-        container.getAllocationRequestId());
+        container.getAllocationRequestId(), null);
   }
 
-  SchedulerRequestKey(Priority priority, long allocationRequestId) {
+
+
+  public SchedulerRequestKey(Priority priority, long allocationRequestId,
+      ContainerId containerToUpdate) {
     this.priority = priority;
     this.allocationRequestId = allocationRequestId;
+    this.containerToUpdate = containerToUpdate;
   }
 
   /**
@@ -76,6 +90,10 @@ public final class SchedulerRequestKey implements
     return allocationRequestId;
   }
 
+  public ContainerId getContainerToUpdate() {
+    return containerToUpdate;
+  }
+
   @Override
   public int compareTo(SchedulerRequestKey o) {
     if (o == null) {
@@ -85,6 +103,15 @@ public final class SchedulerRequestKey implements
         return 1;
       }
     }
+
+    // Ensure updates are ranked higher
+    if (this.containerToUpdate == null && o.containerToUpdate != null) {
+      return -1;
+    }
+    if (this.containerToUpdate != null && o.containerToUpdate == null) {
+      return 1;
+    }
+
     int priorityCompare = o.getPriority().compareTo(priority);
     // we first sort by priority and then by allocationRequestId
     if (priorityCompare != 0) {
@@ -107,16 +134,21 @@ public final class SchedulerRequestKey implements
     if (getAllocationRequestId() != that.getAllocationRequestId()) {
       return false;
     }
-    return getPriority() != null ?
-        getPriority().equals(that.getPriority()) :
-        that.getPriority() == null;
+    if (!getPriority().equals(that.getPriority())) {
+      return false;
+    }
+    return containerToUpdate != null ?
+        containerToUpdate.equals(that.containerToUpdate) :
+        that.containerToUpdate == null;
   }
 
   @Override
   public int hashCode() {
-    int result = getPriority() != null ? getPriority().hashCode() : 0;
-    result = 31 * result + (int) (getAllocationRequestId() ^ (
-        getAllocationRequestId() >>> 32));
+    int result = priority != null ? priority.hashCode() : 0;
+    result = 31 * result + (int) (allocationRequestId ^ (allocationRequestId
+        >>> 32));
+    result = 31 * result + (containerToUpdate != null ? containerToUpdate
+        .hashCode() : 0);
     return result;
   }
 
@@ -125,6 +157,7 @@ public final class SchedulerRequestKey implements
     return "SchedulerRequestKey{" +
         "priority=" + priority +
         ", allocationRequestId=" + allocationRequestId +
+        ", containerToUpdate=" + containerToUpdate +
         '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 9fd1845..70a46a1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -68,7 +68,6 @@ import 
org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@@ -93,7 +92,10 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security
@@ -559,12 +561,10 @@ public class ApplicationMasterService extends 
AbstractService implements
     // Split Update Resource Requests into increase and decrease.
     // No Exceptions are thrown here. All update errors are aggregated
     // and returned to the AM.
-    List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
-    List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
-    List<UpdateContainerError> updateContainerErrors =
+    List<UpdateContainerError> updateErrors = new ArrayList<>();
+    ContainerUpdates containerUpdateRequests =
         RMServerUtils.validateAndSplitUpdateResourceRequests(
-            rmContext, request, maximumCapacity,
-            increaseResourceReqs, decreaseResourceReqs);
+        rmContext, request, maximumCapacity, updateErrors);
 
     // Send new requests to appAttempt.
     Allocation allocation;
@@ -580,7 +580,7 @@ public class ApplicationMasterService extends 
AbstractService implements
       allocation =
           this.rScheduler.allocate(appAttemptId, ask, release,
               blacklistAdditions, blacklistRemovals,
-              increaseResourceReqs, decreaseResourceReqs);
+              containerUpdateRequests);
     }
 
     if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
@@ -596,7 +596,7 @@ public class ApplicationMasterService extends 
AbstractService implements
     }
 
     // Notify the AM of container update errors
-    addToUpdateContainerErrors(allocateResponse, updateContainerErrors);
+    addToUpdateContainerErrors(allocateResponse, updateErrors);
 
     // update the response with the deltas of node status changes
     List<RMNode> updatedNodes = new ArrayList<RMNode>();
@@ -630,15 +630,7 @@ public class ApplicationMasterService extends 
AbstractService implements
         .pullJustFinishedContainers());
     allocateResponse.setAvailableResources(allocation.getResourceLimit());
 
-    // Handling increased containers
-    addToUpdatedContainers(
-        allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
-        allocation.getIncreasedContainers());
-
-    // Handling decreased containers
-    addToUpdatedContainers(
-        allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
-        allocation.getDecreasedContainers());
+    addToContainerUpdates(appAttemptId, allocateResponse, allocation);
 
     allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
 
@@ -658,6 +650,33 @@ public class ApplicationMasterService extends 
AbstractService implements
         .getApplicationPriority());
   }
 
+  private void addToContainerUpdates(ApplicationAttemptId appAttemptId,
+      AllocateResponse allocateResponse, Allocation allocation) {
+    // Handling increased containers
+    addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
+        allocation.getIncreasedContainers());
+
+    // Handling decreased containers
+    addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
+        allocation.getDecreasedContainers());
+
+    // Handling promoted containers
+    addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+        allocation.getPromotedContainers());
+
+    // Handling demoted containers
+    addToUpdatedContainers(
+        allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+        allocation.getDemotedContainers());
+
+    addToUpdateContainerErrors(allocateResponse,
+        ((AbstractYarnScheduler)rScheduler)
+            .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
+  }
+
   protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
       List<UpdateContainerError> updateContainerErrors) {
     if (!updateContainerErrors.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 9d4c092..8f3a888 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
 import 
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
 
-
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
@@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import 
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
 
-
 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
@@ -57,6 +55,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
 
@@ -69,9 +68,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 
-
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 
 import java.io.IOException;
@@ -251,6 +250,7 @@ public class OpportunisticContainerAllocatorAMService
 
     // Allocate GUARANTEED containers.
     request.setAskList(partitionedAsks.getGuaranteed());
+
     super.allocateInternal(appAttemptId, request, allocateResponse);
   }
 
@@ -298,15 +298,9 @@ public class OpportunisticContainerAllocatorAMService
       boolean isRemotelyAllocated) {
     for (Container container : allocContainers) {
       // Create RMContainer
-      SchedulerApplicationAttempt appAttempt =
-          ((AbstractYarnScheduler) rmContext.getScheduler())
-              .getCurrentAttemptForContainer(container.getId());
-      RMContainer rmContainer = new RMContainerImpl(container,
-          appAttempt.getApplicationAttemptId(), container.getNodeId(),
-          appAttempt.getUser(), rmContext, isRemotelyAllocated);
-      appAttempt.addRMContainer(container.getId(), rmContainer);
-      ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
-          container.getNodeId()).allocateContainer(rmContainer);
+      RMContainer rmContainer =
+          SchedulerUtils.createOpportunisticRmContainer(
+              rmContext, container, isRemotelyAllocated);
       rmContainer.handle(
           new RMContainerEvent(container.getId(),
               RMContainerEventType.ACQUIRED));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 74898ca..ebbeb0f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -39,6 +39,8 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -63,6 +65,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
     .RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler
     .ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler
@@ -80,7 +83,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  */
 public class RMServerUtils {
 
-  private static final String UPDATE_OUTSTANDING_ERROR =
+  public static final String UPDATE_OUTSTANDING_ERROR =
       "UPDATE_OUTSTANDING_ERROR";
   private static final String INCORRECT_CONTAINER_VERSION_ERROR =
       "INCORRECT_CONTAINER_VERSION_ERROR";
@@ -124,74 +127,105 @@ public class RMServerUtils {
 
   /**
    * Check if we have:
-   * - Request for same containerId and different target resource
-   * - If targetResources violates maximum/minimumAllocation
-   * @param rmContext RM context
-   * @param request Allocate Request
-   * @param maximumAllocation Maximum Allocation
-   * @param increaseResourceReqs Increase Resource Request
-   * @param decreaseResourceReqs Decrease Resource Request
-   * @return List of container Errors
+   * - Request for same containerId and different target resource.
+   * - If targetResources violates maximum/minimumAllocation.
+   * @param rmContext RM context.
+   * @param request Allocate Request.
+   * @param maximumAllocation Maximum Allocation.
+   * @param updateErrors Container update errors.
+   * @return ContainerUpdateRequests.
    */
-  public static List<UpdateContainerError>
+  public static ContainerUpdates
       validateAndSplitUpdateResourceRequests(RMContext rmContext,
       AllocateRequest request, Resource maximumAllocation,
-      List<UpdateContainerRequest> increaseResourceReqs,
-      List<UpdateContainerRequest> decreaseResourceReqs) {
-    List<UpdateContainerError> errors = new ArrayList<>();
+      List<UpdateContainerError> updateErrors) {
+    ContainerUpdates updateRequests =
+        new ContainerUpdates();
     Set<ContainerId> outstandingUpdate = new HashSet<>();
     for (UpdateContainerRequest updateReq : request.getUpdateRequests()) {
       RMContainer rmContainer = rmContext.getScheduler().getRMContainer(
           updateReq.getContainerId());
-      String msg = null;
-      if (rmContainer == null) {
-        msg = INVALID_CONTAINER_ID;
-      }
-      // Only allow updates if the requested version matches the current
-      // version
-      if (msg == null && updateReq.getContainerVersion() !=
-          rmContainer.getContainer().getVersion()) {
-        msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
-            + updateReq.getContainerVersion() + "|"
-            + rmContainer.getContainer().getVersion();
-      }
-      // No more than 1 container update per request.
-      if (msg == null &&
-          outstandingUpdate.contains(updateReq.getContainerId())) {
-        msg = UPDATE_OUTSTANDING_ERROR;
-      }
+      String msg = validateContainerIdAndVersion(outstandingUpdate,
+          updateReq, rmContainer);
+      ContainerUpdateType updateType = updateReq.getContainerUpdateType();
       if (msg == null) {
-        Resource original = rmContainer.getContainer().getResource();
-        Resource target = updateReq.getCapability();
-        if (Resources.fitsIn(target, original)) {
-          // This is a decrease request
-          if (validateIncreaseDecreaseRequest(rmContext, updateReq,
-              maximumAllocation, false)) {
-            decreaseResourceReqs.add(updateReq);
-            outstandingUpdate.add(updateReq.getContainerId());
+        if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) &&
+            (updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) {
+          Resource original = rmContainer.getContainer().getResource();
+          Resource target = updateReq.getCapability();
+          if (Resources.fitsIn(target, original)) {
+            // This is a decrease request
+            if (validateIncreaseDecreaseRequest(rmContext, updateReq,
+                maximumAllocation, false)) {
+              updateRequests.getDecreaseRequests().add(updateReq);
+              outstandingUpdate.add(updateReq.getContainerId());
+            } else {
+              msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+            }
           } else {
-            msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+            // This is an increase request
+            if (validateIncreaseDecreaseRequest(rmContext, updateReq,
+                maximumAllocation, true)) {
+              updateRequests.getIncreaseRequests().add(updateReq);
+              outstandingUpdate.add(updateReq.getContainerId());
+            } else {
+              msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+            }
           }
         } else {
-          // This is an increase request
-          if (validateIncreaseDecreaseRequest(rmContext, updateReq,
-              maximumAllocation, true)) {
-            increaseResourceReqs.add(updateReq);
-            outstandingUpdate.add(updateReq.getContainerId());
-          } else {
-            msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+          ExecutionType original = rmContainer.getExecutionType();
+          ExecutionType target = updateReq.getExecutionType();
+          if (target != original) {
+            if (target == ExecutionType.GUARANTEED &&
+                original == ExecutionType.OPPORTUNISTIC) {
+              updateRequests.getPromotionRequests().add(updateReq);
+              outstandingUpdate.add(updateReq.getContainerId());
+            } else if (target == ExecutionType.OPPORTUNISTIC &&
+                original == ExecutionType.GUARANTEED) {
+              updateRequests.getDemotionRequests().add(updateReq);
+              outstandingUpdate.add(updateReq.getContainerId());
+            }
           }
         }
       }
-      if (msg != null) {
-        UpdateContainerError updateError = RECORD_FACTORY
-            .newRecordInstance(UpdateContainerError.class);
-        updateError.setReason(msg);
-        updateError.setUpdateContainerRequest(updateReq);
-        errors.add(updateError);
-      }
+      checkAndcreateUpdateError(updateErrors, updateReq, msg);
+    }
+    return updateRequests;
+  }
+
+  private static void checkAndcreateUpdateError(
+      List<UpdateContainerError> errors, UpdateContainerRequest updateReq,
+      String msg) {
+    if (msg != null) {
+      UpdateContainerError updateError = RECORD_FACTORY
+          .newRecordInstance(UpdateContainerError.class);
+      updateError.setReason(msg);
+      updateError.setUpdateContainerRequest(updateReq);
+      errors.add(updateError);
+    }
+  }
+
+  private static String validateContainerIdAndVersion(
+      Set<ContainerId> outstandingUpdate, UpdateContainerRequest updateReq,
+      RMContainer rmContainer) {
+    String msg = null;
+    if (rmContainer == null) {
+      msg = INVALID_CONTAINER_ID;
+    }
+    // Only allow updates if the requested version matches the current
+    // version
+    if (msg == null && updateReq.getContainerVersion() !=
+        rmContainer.getContainer().getVersion()) {
+      msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
+          + updateReq.getContainerVersion() + "|"
+          + rmContainer.getContainer().getVersion();
+    }
+    // No more than 1 container update per request.
+    if (msg == null &&
+        outstandingUpdate.contains(updateReq.getContainerId())) {
+      msg = UPDATE_OUTSTANDING_ERROR;
     }
-    return errors;
+    return msg;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index f7ae488..ab84985 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -95,6 +95,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -1072,7 +1073,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
                 Collections.singletonList(appAttempt.amReq),
                 EMPTY_CONTAINER_RELEASE_LIST,
                 amBlacklist.getBlacklistAdditions(),
-                amBlacklist.getBlacklistRemovals(), null, null);
+                amBlacklist.getBlacklistRemovals(),
+                new ContainerUpdates());
         if (amContainerAllocation != null
             && amContainerAllocation.getContainers() != null) {
           assert (amContainerAllocation.getContainers().size() == 0);
@@ -1096,7 +1098,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       Allocation amContainerAllocation =
           appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
             EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
-            null, null, null);
+            null, new ContainerUpdates());
       // There must be at least one container allocated, because a
       // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
       // and is put in SchedulerApplication#newlyAllocatedContainers.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index dbc6169..79709a3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -108,6 +108,8 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
     // Transitions from ACQUIRED state
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
         RMContainerEventType.LAUNCHED)
+    .addTransition(RMContainerState.ACQUIRED, RMContainerState.ACQUIRED,
+        RMContainerEventType.ACQUIRED)
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
         RMContainerEventType.FINISHED, new FinishedTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
@@ -125,6 +127,8 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
     .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED,
         RMContainerEventType.RELEASED, new KillTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
+        RMContainerEventType.ACQUIRED)
+    .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
         RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
@@ -163,13 +167,13 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   private final WriteLock writeLock;
   private final ApplicationAttemptId appAttemptId;
   private final NodeId nodeId;
-  private final Container container;
   private final RMContext rmContext;
   private final EventHandler eventHandler;
   private final ContainerAllocationExpirer containerAllocationExpirer;
   private final String user;
   private final String nodeLabelExpression;
 
+  private volatile Container container;
   private Resource reservedResource;
   private NodeId reservedNode;
   private SchedulerRequestKey reservedSchedulerKey;
@@ -188,44 +192,44 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   private boolean isExternallyAllocated;
   private SchedulerRequestKey allocatedSchedulerKey;
 
-  public RMContainerImpl(Container container,
+  public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext) {
-    this(container, appAttemptId, nodeId, user, rmContext, System
+    this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
         .currentTimeMillis(), "");
   }
 
-  public RMContainerImpl(Container container,
+  public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext, boolean isExternallyAllocated) {
-    this(container, appAttemptId, nodeId, user, rmContext, System
+    this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
         .currentTimeMillis(), "", isExternallyAllocated);
   }
 
   private boolean saveNonAMContainerMetaInfo;
 
-  public RMContainerImpl(Container container,
+  public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext, String nodeLabelExpression) {
-    this(container, appAttemptId, nodeId, user, rmContext, System
+    this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
       .currentTimeMillis(), nodeLabelExpression);
   }
 
-  public RMContainerImpl(Container container,
+  public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext, long creationTime, String nodeLabelExpression) {
-    this(container, appAttemptId, nodeId, user, rmContext, creationTime,
-        nodeLabelExpression, false);
+    this(container, schedulerKey, appAttemptId, nodeId, user, rmContext,
+        creationTime, nodeLabelExpression, false);
   }
 
-  public RMContainerImpl(Container container,
+  public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext, long creationTime, String nodeLabelExpression,
       boolean isExternallyAllocated) {
     this.stateMachine = stateMachineFactory.make(this);
     this.nodeId = nodeId;
     this.container = container;
-    this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container);
+    this.allocatedSchedulerKey = schedulerKey;
     this.appAttemptId = appAttemptId;
     this.user = user;
     this.creationTime = creationTime;
@@ -276,6 +280,10 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
     return this.container;
   }
 
+  public void setContainer(Container container) {
+    this.container = container;
+  }
+
   @Override
   public RMContainerState getState() {
     this.readLock.lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index c1a985d..acfcde8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
@@ -62,6 +63,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import 
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -81,6 +83,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -504,9 +511,11 @@ public abstract class AbstractYarnScheduler
     ApplicationAttemptId attemptId =
         container.getId().getApplicationAttemptId();
     RMContainer rmContainer =
-        new RMContainerImpl(container, attemptId, node.getNodeID(),
-          applications.get(attemptId.getApplicationId()).getUser(), rmContext,
-          status.getCreationTime(), status.getNodeLabelExpression());
+        new RMContainerImpl(container,
+            SchedulerRequestKey.extractFrom(container), attemptId,
+            node.getNodeID(), applications.get(
+            attemptId.getApplicationId()).getUser(), rmContext,
+            status.getCreationTime(), status.getNodeLabelExpression());
     return rmContainer;
   }
 
@@ -1053,4 +1062,93 @@ public abstract class AbstractYarnScheduler
       normalizeRequest(ask);
     }
   }
+
+  protected void handleExecutionTypeUpdates(
+      SchedulerApplicationAttempt appAttempt,
+      List<UpdateContainerRequest> promotionRequests,
+      List<UpdateContainerRequest> demotionRequests) {
+    if (promotionRequests != null && !promotionRequests.isEmpty()) {
+      LOG.info("Promotion Update requests : " + promotionRequests);
+      handlePromotionRequests(appAttempt, promotionRequests);
+    }
+    if (demotionRequests != null && !demotionRequests.isEmpty()) {
+      LOG.info("Demotion Update requests : " + demotionRequests);
+      handleDemotionRequests(appAttempt, demotionRequests);
+    }
+  }
+
+  private void handlePromotionRequests(
+      SchedulerApplicationAttempt applicationAttempt,
+      List<UpdateContainerRequest> updateContainerRequests) {
+    for (UpdateContainerRequest uReq : updateContainerRequests) {
+      RMContainer rmContainer =
+          rmContext.getScheduler().getRMContainer(uReq.getContainerId());
+      // Check if this is a container update
+      // And not in the middle of a Demotion
+      if (rmContainer != null) {
+        // Check if this is an executionType change request
+        // If so, fix the rr to make it look like a normal rr
+        // with relaxLocality=false and numContainers=1
+        SchedulerNode schedulerNode = rmContext.getScheduler()
+            .getSchedulerNode(rmContainer.getContainer().getNodeId());
+
+        // Add only if no outstanding promote requests exist.
+        if (!applicationAttempt.getUpdateContext()
+            .checkAndAddToOutstandingIncreases(
+                rmContainer, schedulerNode, uReq)) {
+          applicationAttempt.addToUpdateContainerErrors(
+              UpdateContainerError.newInstance(
+              RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
+        }
+      } else {
+        LOG.warn("Cannot promote non-existent (or completed) Container ["
+            + uReq.getContainerId() + "]");
+      }
+    }
+  }
+
+  private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt,
+      List<UpdateContainerRequest> demotionRequests) {
+    OpportunisticContainerContext oppCntxt =
+        appAttempt.getOpportunisticContainerContext();
+    for (UpdateContainerRequest uReq : demotionRequests) {
+      RMContainer rmContainer =
+          rmContext.getScheduler().getRMContainer(uReq.getContainerId());
+      if (rmContainer != null) {
+        if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases(
+            rmContainer.getContainer())) {
+          RMContainer demotedRMContainer =
+              createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
+          appAttempt.addToNewlyDemotedContainers(
+              uReq.getContainerId(), demotedRMContainer);
+        } else {
+          appAttempt.addToUpdateContainerErrors(
+              UpdateContainerError.newInstance(
+              RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
+        }
+      } else {
+        LOG.warn("Cannot demote non-existent (or completed) Container ["
+            + uReq.getContainerId() + "]");
+      }
+    }
+  }
+
+  private RMContainer createDemotedRMContainer(
+      SchedulerApplicationAttempt appAttempt,
+      OpportunisticContainerContext oppCntxt,
+      RMContainer rmContainer) {
+    SchedulerRequestKey sk =
+        SchedulerRequestKey.extractFrom(rmContainer.getContainer());
+    Container demotedContainer = BuilderUtils.newContainer(
+        ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
+            oppCntxt.getContainerIdGenerator().generateContainerId()),
+        rmContainer.getContainer().getNodeId(),
+        rmContainer.getContainer().getNodeHttpAddress(),
+        rmContainer.getContainer().getResource(),
+        sk.getPriority(), null, ExecutionType.OPPORTUNISTIC,
+        sk.getAllocationRequestId());
+    demotedContainer.setVersion(rmContainer.getContainer().getVersion());
+    return SchedulerUtils.createOpportunisticRmContainer(
+        rmContext, demotedContainer, false);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
index b81da2b..43eadab 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
@@ -36,6 +36,8 @@ public class Allocation {
   final List<NMToken> nmTokens;
   final List<Container> increasedContainers;
   final List<Container> decreasedContainers;
+  final List<Container> promotedContainers;
+  final List<Container> demotedContainers;
   private Resource resourceLimit;
 
 
@@ -50,13 +52,23 @@ public class Allocation {
       Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
       List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
     this(containers,  resourceLimit,strictContainers,  fungibleContainers,
-      fungibleResources, nmTokens, null, null);
+      fungibleResources, nmTokens, null, null, null, null);
   }
-  
+
   public Allocation(List<Container> containers, Resource resourceLimit,
       Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
       List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
       List<Container> increasedContainers, List<Container> decreasedContainer) 
{
+    this(containers,  resourceLimit,strictContainers,  fungibleContainers,
+        fungibleResources, nmTokens, increasedContainers, decreasedContainer,
+        null, null);
+  }
+
+  public Allocation(List<Container> containers, Resource resourceLimit,
+      Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
+      List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
+      List<Container> increasedContainers, List<Container> decreasedContainer,
+      List<Container> promotedContainers, List<Container> demotedContainer) {
     this.containers = containers;
     this.resourceLimit = resourceLimit;
     this.strictContainers = strictContainers;
@@ -65,6 +77,8 @@ public class Allocation {
     this.nmTokens = nmTokens;
     this.increasedContainers = increasedContainers;
     this.decreasedContainers = decreasedContainer;
+    this.promotedContainers = promotedContainers;
+    this.demotedContainers = demotedContainer;
   }
 
   public List<Container> getContainers() {
@@ -99,6 +113,14 @@ public class Allocation {
     return decreasedContainers;
   }
 
+  public List<Container> getPromotedContainers() {
+    return promotedContainers;
+  }
+
+  public List<Container> getDemotedContainers() {
+    return demotedContainers;
+  }
+
   @VisibleForTesting
   public void setResourceLimit(Resource resource) {
     this.resourceLimit = resource;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 0551df1..d901d90 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -55,7 +55,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 /**
  * This class keeps track of all the consumption of an application. This also
  * keeps track of current running/completed containers for the application.
@@ -92,10 +91,11 @@ public class AppSchedulingInfo {
   final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
       SchedContainerChangeRequest>>> containerIncreaseRequestMap =
       new ConcurrentHashMap<>();
-
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
 
+  public final ContainerUpdateContext updateContext;
+
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       long epoch, ResourceUsage appResourceUsage) {
@@ -109,6 +109,7 @@ public class AppSchedulingInfo {
     this.appResourceUsage = appResourceUsage;
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    updateContext = new ContainerUpdateContext(this);
     readLock = lock.readLock();
     writeLock = lock.writeLock();
   }
@@ -376,6 +377,10 @@ public class AppSchedulingInfo {
     }
   }
 
+  public ContainerUpdateContext getUpdateContext() {
+    return updateContext;
+  }
+
   /**
    * The ApplicationMaster is updating resource requirements for the
    * application, by asking for more resources and releasing resources acquired
@@ -413,29 +418,9 @@ public class AppSchedulingInfo {
       }
 
       // Update scheduling placement set
-      for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry 
: dedupRequests.entrySet()) {
-        SchedulerRequestKey schedulerRequestKey = entry.getKey();
-
-        if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
-          schedulerKeyToPlacementSets.put(schedulerRequestKey,
-              new LocalitySchedulingPlacementSet<>(this));
-        }
-
-        // Update placement set
-        ResourceRequestUpdateResult pendingAmountChanges =
-            schedulerKeyToPlacementSets.get(schedulerRequestKey)
-                .updateResourceRequests(
-                    entry.getValue().values(),
-                    recoverPreemptedRequestForAContainer);
-
-        if (null != pendingAmountChanges) {
-          updatePendingResources(
-              pendingAmountChanges.getLastAnyResourceRequest(),
-              pendingAmountChanges.getNewResourceRequest(), 
schedulerRequestKey,
-              queue.getMetrics());
-          offswitchResourcesUpdated = true;
-        }
-      }
+      offswitchResourcesUpdated =
+          addToPlacementSets(
+              recoverPreemptedRequestForAContainer, dedupRequests);
 
       return offswitchResourcesUpdated;
     } finally {
@@ -443,6 +428,37 @@ public class AppSchedulingInfo {
     }
   }
 
+  boolean addToPlacementSets(
+      boolean recoverPreemptedRequestForAContainer,
+      Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
+    boolean offswitchResourcesUpdated = false;
+    for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry :
+        dedupRequests.entrySet()) {
+      SchedulerRequestKey schedulerRequestKey = entry.getKey();
+
+      if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
+        schedulerKeyToPlacementSets.put(schedulerRequestKey,
+            new LocalitySchedulingPlacementSet<>(this));
+      }
+
+      // Update placement set
+      ResourceRequestUpdateResult pendingAmountChanges =
+          schedulerKeyToPlacementSets.get(schedulerRequestKey)
+              .updateResourceRequests(
+                  entry.getValue().values(),
+                  recoverPreemptedRequestForAContainer);
+
+      if (null != pendingAmountChanges) {
+        updatePendingResources(
+            pendingAmountChanges.getLastAnyResourceRequest(),
+            pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey,
+            queue.getMetrics());
+        offswitchResourcesUpdated = true;
+      }
+    }
+    return offswitchResourcesUpdated;
+  }
+
   private void updatePendingResources(ResourceRequest lastRequest,
       ResourceRequest request, SchedulerRequestKey schedulerKey,
       QueueMetrics metrics) {
@@ -717,8 +733,8 @@ public class AppSchedulingInfo {
         updateMetricsForAllocatedContainer(type, containerAllocated);
       }
 
-      return schedulerKeyToPlacementSets.get(schedulerKey).allocate(type, node,
-          request);
+      return schedulerKeyToPlacementSets.get(schedulerKey)
+          .allocate(schedulerKey, type, node, request);
     } finally {
       writeLock.unlock();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to