YARN-5392. Replace use of Priority in the Scheduling infrastructure with an 
opaque ShedulerRequestKey. (asuresh and subru)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5aace38b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5aace38b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5aace38b

Branch: refs/heads/HADOOP-12756
Commit: 5aace38b748ba71aaadd2c4d64eba8dc1f816828
Parents: d2cf8b5
Author: Arun Suresh <asur...@apache.org>
Authored: Thu Jul 21 20:57:44 2016 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Tue Jul 26 14:54:03 2016 -0700

----------------------------------------------------------------------
 .../capacity/FifoCandidatesSelector.java        |  11 +-
 .../resourcemanager/resource/Priority.java      |  10 +-
 .../rmcontainer/RMContainer.java                |   7 +-
 .../rmcontainer/RMContainerImpl.java            |  19 +-
 .../rmcontainer/RMContainerReservedEvent.java   |  12 +-
 .../scheduler/AppSchedulingInfo.java            | 160 +++++++-------
 .../scheduler/SchedulerApplicationAttempt.java  | 160 ++++++++------
 .../scheduler/SchedulerNode.java                |   5 +-
 .../scheduler/SchedulerRequestKey.java          |  99 +++++++++
 .../scheduler/capacity/LeafQueue.java           |  10 +-
 .../allocator/IncreaseContainerAllocator.java   |  25 ++-
 .../allocator/RegularContainerAllocator.java    | 181 ++++++++--------
 .../scheduler/common/fica/FiCaSchedulerApp.java |  56 ++---
 .../common/fica/FiCaSchedulerNode.java          |   5 +-
 .../scheduler/fair/FSAppAttempt.java            | 213 +++++++++++--------
 .../scheduler/fair/FSSchedulerNode.java         |   5 +-
 .../scheduler/fair/FairScheduler.java           |   2 +-
 .../scheduler/fifo/FifoScheduler.java           |  87 ++++----
 .../server/resourcemanager/Application.java     | 120 ++++++-----
 .../yarn/server/resourcemanager/Task.java       |  11 +-
 ...alCapacityPreemptionPolicyMockFramework.java |   4 +
 ...estProportionalCapacityPreemptionPolicy.java |   4 +
 .../rmcontainer/TestRMContainerImpl.java        |   6 +-
 .../TestSchedulerApplicationAttempt.java        |  31 +--
 .../capacity/TestCapacityScheduler.java         |  10 +-
 .../scheduler/capacity/TestLeafQueue.java       | 170 ++++++++-------
 .../TestNodeLabelContainerAllocation.java       |   4 +-
 .../scheduler/capacity/TestReservations.java    |  52 +++--
 .../scheduler/capacity/TestUtils.java           |  12 ++
 .../scheduler/fair/TestFSAppAttempt.java        |  18 +-
 .../scheduler/fair/TestFairScheduler.java       |  15 +-
 .../fair/TestFairSchedulerPreemption.java       |  24 ++-
 32 files changed, 925 insertions(+), 623 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index a8c62fd..9df395d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -343,12 +342,10 @@ public class FifoCandidatesSelector
     Collections.sort(containers, new Comparator<RMContainer>() {
       @Override
       public int compare(RMContainer a, RMContainer b) {
-        Comparator<Priority> c = new org.apache.hadoop.yarn.server
-            .resourcemanager.resource.Priority.Comparator();
-        int priorityComp = c.compare(b.getContainer().getPriority(),
-            a.getContainer().getPriority());
-        if (priorityComp != 0) {
-          return priorityComp;
+        int schedKeyComp = b.getAllocatedSchedulerKey()
+                .compareTo(a.getAllocatedSchedulerKey());
+        if (schedKeyComp != 0) {
+          return schedKeyComp;
         }
         return b.getContainerId().compareTo(a.getContainerId());
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java
index 5060c4c..f098806 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java
@@ -27,13 +27,5 @@ public class Priority {
     priority.setPriority(prio);
     return priority;
   }
-  
-  public static class Comparator 
-  implements java.util.Comparator<org.apache.hadoop.yarn.api.records.Priority> 
{
-    @Override
-    public int compare(org.apache.hadoop.yarn.api.records.Priority o1, 
org.apache.hadoop.yarn.api.records.Priority o2) {
-      return o1.getPriority() - o2.getPriority();
-    }
-  }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 504c973..e5d1208 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+
 
 /**
  * Represents the ResourceManager's view of an application container. See 
@@ -55,7 +56,7 @@ public interface RMContainer extends 
EventHandler<RMContainerEvent> {
 
   NodeId getReservedNode();
   
-  Priority getReservedPriority();
+  SchedulerRequestKey getReservedSchedulerKey();
 
   Resource getAllocatedResource();
 
@@ -63,6 +64,8 @@ public interface RMContainer extends 
EventHandler<RMContainerEvent> {
 
   NodeId getAllocatedNode();
 
+  SchedulerRequestKey getAllocatedSchedulerKey();
+
   Priority getAllocatedPriority();
 
   long getCreationTime();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index ed819a0..706821e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -53,12 +53,12 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
     .RMNodeDecreaseContainerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
@@ -173,7 +173,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
 
   private Resource reservedResource;
   private NodeId reservedNode;
-  private Priority reservedPriority;
+  private SchedulerRequestKey reservedSchedulerKey;
   private long creationTime;
   private long finishTime;
   private ContainerStatus finishedStatus;
@@ -187,6 +187,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   private volatile String queueName;
 
   private boolean isExternallyAllocated;
+  private SchedulerRequestKey allocatedSchedulerKey;
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -226,6 +227,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
     this.containerId = container.getId();
     this.nodeId = nodeId;
     this.container = container;
+    this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container);
     this.appAttemptId = appAttemptId;
     this.user = user;
     this.creationTime = creationTime;
@@ -296,8 +298,8 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   }
 
   @Override
-  public Priority getReservedPriority() {
-    return reservedPriority;
+  public SchedulerRequestKey getReservedSchedulerKey() {
+    return reservedSchedulerKey;
   }
 
   @Override
@@ -326,6 +328,11 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   }
 
   @Override
+  public SchedulerRequestKey getAllocatedSchedulerKey() {
+    return allocatedSchedulerKey;
+  }
+
+  @Override
   public Priority getAllocatedPriority() {
     return container.getPriority();
   }
@@ -535,7 +542,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
       RMContainerReservedEvent e = (RMContainerReservedEvent)event;
       container.reservedResource = e.getReservedResource();
       container.reservedNode = e.getReservedNode();
-      container.reservedPriority = e.getReservedPriority();
+      container.reservedSchedulerKey = e.getReservedSchedulerKey();
       
       if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
           .contains(container.getState())) {
@@ -768,7 +775,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
     try {
       containerReport = ContainerReport.newInstance(this.getContainerId(),
           this.getAllocatedResource(), this.getAllocatedNode(),
-          this.getAllocatedPriority(), this.getCreationTime(),
+          this.getAllocatedSchedulerKey().getPriority(), 
this.getCreationTime(),
           this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(),
           this.getContainerExitStatus(), this.getContainerState(),
           this.getNodeHttpAddress());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
index 74e2dc4..d7d1e94 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
@@ -20,8 +20,8 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 
 /**
  * The event signifying that a container has been reserved.
@@ -33,15 +33,15 @@ public class RMContainerReservedEvent extends 
RMContainerEvent {
 
   private final Resource reservedResource;
   private final NodeId reservedNode;
-  private final Priority reservedPriority;
+  private final SchedulerRequestKey reservedSchedulerKey;
   
   public RMContainerReservedEvent(ContainerId containerId,
       Resource reservedResource, NodeId reservedNode, 
-      Priority reservedPriority) {
+      SchedulerRequestKey reservedSchedulerKey) {
     super(containerId, RMContainerEventType.RESERVED);
     this.reservedResource = reservedResource;
     this.reservedNode = reservedNode;
-    this.reservedPriority = reservedPriority;
+    this.reservedSchedulerKey = reservedSchedulerKey;
   }
 
   public Resource getReservedResource() {
@@ -52,8 +52,8 @@ public class RMContainerReservedEvent extends 
RMContainerEvent {
     return reservedNode;
   }
 
-  public Priority getReservedPriority() {
-    return reservedPriority;
+  public SchedulerRequestKey getReservedSchedulerKey() {
+    return reservedSchedulerKey;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 8d42c97..3764664 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -20,7 +20,6 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -41,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -60,8 +58,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class AppSchedulingInfo {
   
   private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
-  private static final Comparator<Priority> COMPARATOR =
-      new 
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator();
   private static final int EPOCH_BIT_SHIFT = 40;
 
   private final ApplicationId applicationId;
@@ -82,10 +78,10 @@ public class AppSchedulingInfo {
 
   private Set<String> requestedPartitions = new HashSet<>();
 
-  final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
-  final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
-      new ConcurrentHashMap<>();
-  final Map<NodeId, Map<Priority, Map<ContainerId,
+  final Set<SchedulerRequestKey> schedulerKeys = new TreeSet<>();
+  final Map<SchedulerRequestKey, Map<String, ResourceRequest>>
+      resourceRequestMap = new ConcurrentHashMap<>();
+  final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
       SchedContainerChangeRequest>>> containerIncreaseRequestMap =
       new ConcurrentHashMap<>();
 
@@ -134,22 +130,23 @@ public class AppSchedulingInfo {
    * Clear any pending requests from this application.
    */
   private synchronized void clearRequests() {
-    priorities.clear();
+    schedulerKeys.clear();
     resourceRequestMap.clear();
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
   public synchronized boolean hasIncreaseRequest(NodeId nodeId) {
-    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> 
requestsOnNode =
-        containerIncreaseRequestMap.get(nodeId);
+    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
     return requestsOnNode == null ? false : requestsOnNode.size() > 0;
   }
-  
+
   public synchronized Map<ContainerId, SchedContainerChangeRequest>
-      getIncreaseRequests(NodeId nodeId, Priority priority) {
-    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> 
requestsOnNode =
-        containerIncreaseRequestMap.get(nodeId);
-    return requestsOnNode == null ? null : requestsOnNode.get(priority);
+      getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
+    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+    return requestsOnNode == null ? null : requestsOnNode.get(
+        schedulerKey);
   }
 
   /**
@@ -175,15 +172,17 @@ public class AppSchedulingInfo {
       }
       NodeId nodeId = r.getRMContainer().getAllocatedNode();
 
-      Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> 
requestsOnNode =
-          containerIncreaseRequestMap.get(nodeId);
+      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
       if (null == requestsOnNode) {
         requestsOnNode = new TreeMap<>();
         containerIncreaseRequestMap.put(nodeId, requestsOnNode);
       }
 
       SchedContainerChangeRequest prevChangeRequest =
-          getIncreaseRequest(nodeId, r.getPriority(), r.getContainerId());
+          getIncreaseRequest(nodeId,
+              r.getRMContainer().getAllocatedSchedulerKey(),
+              r.getContainerId());
       if (null != prevChangeRequest) {
         if (Resources.equals(prevChangeRequest.getTargetCapacity(),
             r.getTargetCapacity())) {
@@ -192,7 +191,8 @@ public class AppSchedulingInfo {
         }
 
         // remove the old one, as we will use the new one going forward
-        removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(),
+        removeIncreaseRequest(nodeId,
+            prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
             prevChangeRequest.getContainerId());
       }
 
@@ -219,21 +219,22 @@ public class AppSchedulingInfo {
    */
   private void insertIncreaseRequest(SchedContainerChangeRequest request) {
     NodeId nodeId = request.getNodeId();
-    Priority priority = request.getPriority();
+    SchedulerRequestKey schedulerKey =
+        request.getRMContainer().getAllocatedSchedulerKey();
     ContainerId containerId = request.getContainerId();
     
-    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> 
requestsOnNode =
-        containerIncreaseRequestMap.get(nodeId);
+    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
     if (null == requestsOnNode) {
       requestsOnNode = new HashMap<>();
       containerIncreaseRequestMap.put(nodeId, requestsOnNode);
     }
 
     Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-        requestsOnNode.get(priority);
+        requestsOnNode.get(schedulerKey);
     if (null == requestsOnNodeWithPriority) {
       requestsOnNodeWithPriority = new TreeMap<>();
-      requestsOnNode.put(priority, requestsOnNodeWithPriority);
+      requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
     }
 
     requestsOnNodeWithPriority.put(containerId, request);
@@ -249,20 +250,20 @@ public class AppSchedulingInfo {
           + " delta=" + delta);
     }
     
-    // update priorities
-    priorities.add(priority);
+    // update Scheduler Keys
+    schedulerKeys.add(schedulerKey);
   }
   
-  public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority 
priority,
-      ContainerId containerId) {
-    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> 
requestsOnNode =
-        containerIncreaseRequestMap.get(nodeId);
+  public synchronized boolean removeIncreaseRequest(NodeId nodeId,
+      SchedulerRequestKey schedulerKey, ContainerId containerId) {
+    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
     if (null == requestsOnNode) {
       return false;
     }
 
     Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-        requestsOnNode.get(priority);
+        requestsOnNode.get(schedulerKey);
     if (null == requestsOnNodeWithPriority) {
       return false;
     }
@@ -272,7 +273,7 @@ public class AppSchedulingInfo {
     
     // remove hierarchies if it becomes empty
     if (requestsOnNodeWithPriority.isEmpty()) {
-      requestsOnNode.remove(priority);
+      requestsOnNode.remove(schedulerKey);
     }
     if (requestsOnNode.isEmpty()) {
       containerIncreaseRequestMap.remove(nodeId);
@@ -296,15 +297,15 @@ public class AppSchedulingInfo {
   }
   
   public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
-      Priority priority, ContainerId containerId) {
-    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> 
requestsOnNode =
-        containerIncreaseRequestMap.get(nodeId);
+      SchedulerRequestKey schedulerKey, ContainerId containerId) {
+    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
     if (null == requestsOnNode) {
       return null;
     }
 
     Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-        requestsOnNode.get(priority);
+        requestsOnNode.get(schedulerKey);
     return requestsOnNodeWithPriority == null ? null
         : requestsOnNodeWithPriority.get(containerId);
   }
@@ -328,17 +329,18 @@ public class AppSchedulingInfo {
 
     // Update resource requests
     for (ResourceRequest request : requests) {
-      Priority priority = request.getPriority();
+      SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
       String resourceName = request.getResourceName();
 
       // Update node labels if required
       updateNodeLabels(request);
 
-      Map<String, ResourceRequest> asks = 
this.resourceRequestMap.get(priority);
+      Map<String, ResourceRequest> asks =
+          this.resourceRequestMap.get(schedulerKey);
       if (asks == null) {
         asks = new ConcurrentHashMap<>();
-        this.resourceRequestMap.put(priority, asks);
-        this.priorities.add(priority);
+        this.resourceRequestMap.put(schedulerKey, asks);
+        this.schedulerKeys.add(schedulerKey);
       }
 
       // Increment number of containers if recovering preempted resources
@@ -405,11 +407,11 @@ public class AppSchedulingInfo {
   }
 
   private void updateNodeLabels(ResourceRequest request) {
-    Priority priority = request.getPriority();
+    SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
     String resourceName = request.getResourceName();
     if (resourceName.equals(ResourceRequest.ANY)) {
       ResourceRequest previousAnyRequest =
-          getResourceRequest(priority, resourceName);
+          getResourceRequest(schedulerKey, resourceName);
 
       // When there is change in ANY request label expression, we should
       // update label for all resource requests already added of same
@@ -417,7 +419,7 @@ public class AppSchedulingInfo {
       if ((null == previousAnyRequest)
           || hasRequestLabelChanged(previousAnyRequest, request)) {
         Map<String, ResourceRequest> resourceRequest =
-            getResourceRequests(priority);
+            getResourceRequests(schedulerKey);
         if (resourceRequest != null) {
           for (ResourceRequest r : resourceRequest.values()) {
             if (!r.getResourceName().equals(ResourceRequest.ANY)) {
@@ -428,7 +430,7 @@ public class AppSchedulingInfo {
       }
     } else {
       ResourceRequest anyRequest =
-          getResourceRequest(priority, ResourceRequest.ANY);
+          getResourceRequest(schedulerKey, ResourceRequest.ANY);
       if (anyRequest != null) {
         request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
       }
@@ -501,13 +503,13 @@ public class AppSchedulingInfo {
     return userBlacklistChanged.getAndSet(false);
   }
 
-  public synchronized Collection<Priority> getPriorities() {
-    return priorities;
+  public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
+    return schedulerKeys;
   }
 
   public synchronized Map<String, ResourceRequest> getResourceRequests(
-      Priority priority) {
-    return resourceRequestMap.get(priority);
+      SchedulerRequestKey schedulerKey) {
+    return resourceRequestMap.get(schedulerKey);
   }
 
   public synchronized List<ResourceRequest> getAllResourceRequests() {
@@ -518,14 +520,16 @@ public class AppSchedulingInfo {
     return ret;
   }
 
-  public synchronized ResourceRequest getResourceRequest(Priority priority,
-      String resourceName) {
-    Map<String, ResourceRequest> nodeRequests = 
resourceRequestMap.get(priority);
+  public synchronized ResourceRequest getResourceRequest(
+      SchedulerRequestKey schedulerKey, String resourceName) {
+    Map<String, ResourceRequest> nodeRequests =
+        resourceRequestMap.get(schedulerKey);
     return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
   }
 
-  public synchronized Resource getResource(Priority priority) {
-    ResourceRequest request = getResourceRequest(priority, 
ResourceRequest.ANY);
+  public synchronized Resource getResource(SchedulerRequestKey schedulerKey) {
+    ResourceRequest request =
+        getResourceRequest(schedulerKey, ResourceRequest.ANY);
     return (request == null) ? null : request.getCapability();
   }
 
@@ -555,7 +559,8 @@ public class AppSchedulingInfo {
   public synchronized void increaseContainer(
       SchedContainerChangeRequest increaseRequest) {
     NodeId nodeId = increaseRequest.getNodeId();
-    Priority priority = increaseRequest.getPriority();
+    SchedulerRequestKey schedulerKey =
+        increaseRequest.getRMContainer().getAllocatedSchedulerKey();
     ContainerId containerId = increaseRequest.getContainerId();
     Resource deltaCapacity = increaseRequest.getDeltaCapacity();
 
@@ -568,7 +573,7 @@ public class AppSchedulingInfo {
     // Set queue metrics
     queue.getMetrics().allocateResources(user, deltaCapacity);
     // remove the increase request from pending increase request map
-    removeIncreaseRequest(nodeId, priority, containerId);
+    removeIncreaseRequest(nodeId, schedulerKey, containerId);
     // update usage
     appResourceUsage.incUsed(increaseRequest.getNodePartition(), 
deltaCapacity);
   }
@@ -591,19 +596,25 @@ public class AppSchedulingInfo {
     // update usage
     appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
   }
-  
+
   /**
    * Resources have been allocated to this application by the resource
    * scheduler. Track them.
+   * @param type Node Type
+   * @param node SchedulerNode
+   * @param schedulerKey SchedulerRequestKey
+   * @param request ResourceRequest
+   * @param containerAllocated Container Allocated
+   * @return List of ResourceRequests
    */
   public synchronized List<ResourceRequest> allocate(NodeType type,
-      SchedulerNode node, Priority priority, ResourceRequest request,
-      Container containerAllocated) {
+      SchedulerNode node, SchedulerRequestKey schedulerKey,
+      ResourceRequest request, Container containerAllocated) {
     List<ResourceRequest> resourceRequests = new ArrayList<>();
     if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, priority, request, resourceRequests);
+      allocateNodeLocal(node, schedulerKey, request, resourceRequests);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, resourceRequests);
+      allocateRackLocal(node, schedulerKey, request, resourceRequests);
     } else {
       allocateOffSwitch(request, resourceRequests);
     }
@@ -633,16 +644,16 @@ public class AppSchedulingInfo {
    * application.
    */
   private synchronized void allocateNodeLocal(SchedulerNode node,
-      Priority priority, ResourceRequest nodeLocalRequest,
+      SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
-    decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
+    decResourceRequest(node.getNodeName(), schedulerKey, nodeLocalRequest);
 
-    ResourceRequest rackLocalRequest = resourceRequestMap.get(priority).get(
+    ResourceRequest rackLocalRequest = 
resourceRequestMap.get(schedulerKey).get(
         node.getRackName());
-    decResourceRequest(node.getRackName(), priority, rackLocalRequest);
+    decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest);
 
-    ResourceRequest offRackRequest = resourceRequestMap.get(priority).get(
+    ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
         ResourceRequest.ANY);
     decrementOutstanding(offRackRequest);
 
@@ -652,11 +663,11 @@ public class AppSchedulingInfo {
     resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
-  private void decResourceRequest(String resourceName, Priority priority,
-      ResourceRequest request) {
+  private void decResourceRequest(String resourceName,
+      SchedulerRequestKey schedulerKey, ResourceRequest request) {
     request.setNumContainers(request.getNumContainers() - 1);
     if (request.getNumContainers() == 0) {
-      resourceRequestMap.get(priority).remove(resourceName);
+      resourceRequestMap.get(schedulerKey).remove(resourceName);
     }
   }
 
@@ -665,12 +676,12 @@ public class AppSchedulingInfo {
    * application.
    */
   private synchronized void allocateRackLocal(SchedulerNode node,
-      Priority priority, ResourceRequest rackLocalRequest,
+      SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
-    decResourceRequest(node.getRackName(), priority, rackLocalRequest);
+    decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest);
     
-    ResourceRequest offRackRequest = resourceRequestMap.get(priority).get(
+    ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
         ResourceRequest.ANY);
     decrementOutstanding(offRackRequest);
 
@@ -712,8 +723,9 @@ public class AppSchedulingInfo {
   
   private synchronized void checkForDeactivation() {
     boolean deactivate = true;
-    for (Priority priority : getPriorities()) {
-      ResourceRequest request = getResourceRequest(priority, 
ResourceRequest.ANY);
+    for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
+      ResourceRequest request =
+          getResourceRequest(schedulerKey, ResourceRequest.ANY);
       if (request != null) {
         if (request.getNumContainers() > 0) {
           deactivate = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index b4a2639..c4b32a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -98,10 +98,11 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   protected ApplicationAttemptId attemptId;
   protected Map<ContainerId, RMContainer> liveContainers =
       new HashMap<ContainerId, RMContainer>();
-  protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
-      new HashMap<Priority, Map<NodeId, RMContainer>>();
+  protected final Map<SchedulerRequestKey, Map<NodeId, RMContainer>>
+      reservedContainers = new HashMap<>();
 
-  private final Multiset<Priority> reReservations = HashMultiset.create();
+  private final Multiset<SchedulerRequestKey> reReservations =
+      HashMultiset.create();
   
   private Resource resourceLimit = Resource.newInstance(0, 0);
   private boolean unmanagedAM = true;
@@ -137,7 +138,7 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
    * the application successfully schedules a task (at rack or node local), it
    * is reset to 0.
    */
-  Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+  Multiset<SchedulerRequestKey> schedulingOpportunities = 
HashMultiset.create();
   
   /**
    * Count how many times the application has been given an opportunity to
@@ -146,12 +147,12 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
    * incremented, and each time the application successfully schedules a task,
    * it is reset to 0 when schedule any task at corresponding priority.
    */
-  Multiset<Priority> missedNonPartitionedRequestSchedulingOpportunity =
+  Multiset<SchedulerRequestKey> missedNonPartitionedReqSchedulingOpportunity =
       HashMultiset.create();
   
   // Time of the last container scheduled at the current allowed level
-  protected Map<Priority, Long> lastScheduledContainer =
-      new HashMap<Priority, Long>();
+  protected Map<SchedulerRequestKey, Long> lastScheduledContainer =
+      new HashMap<>();
 
   protected Queue queue;
   protected boolean isStopped = false;
@@ -225,8 +226,9 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     return appSchedulingInfo.getUser();
   }
 
-  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
-    return appSchedulingInfo.getResourceRequests(priority);
+  public Map<String, ResourceRequest> getResourceRequests(
+      SchedulerRequestKey schedulerKey) {
+    return appSchedulingInfo.getResourceRequests(schedulerKey);
   }
 
   public Set<ContainerId> getPendingRelease() {
@@ -237,22 +239,24 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     return appSchedulingInfo.getNewContainerId();
   }
 
-  public Collection<Priority> getPriorities() {
-    return appSchedulingInfo.getPriorities();
+  public Collection<SchedulerRequestKey> getSchedulerKeys() {
+    return appSchedulingInfo.getSchedulerKeys();
   }
   
-  public synchronized ResourceRequest getResourceRequest(Priority priority,
-      String resourceName) {
-    return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
+  public synchronized ResourceRequest getResourceRequest(
+      SchedulerRequestKey schedulerKey, String resourceName) {
+    return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName);
   }
 
-  public synchronized int getTotalRequiredResources(Priority priority) {
-    ResourceRequest request = getResourceRequest(priority, 
ResourceRequest.ANY);
+  public synchronized int getTotalRequiredResources(
+      SchedulerRequestKey schedulerKey) {
+    ResourceRequest request =
+        getResourceRequest(schedulerKey, ResourceRequest.ANY);
     return request == null ? 0 : request.getNumContainers();
   }
 
-  public synchronized Resource getResource(Priority priority) {
-    return appSchedulingInfo.getResource(priority);
+  public synchronized Resource getResource(SchedulerRequestKey schedulerKey) {
+    return appSchedulingInfo.getResource(schedulerKey);
   }
 
   public String getQueueName() {
@@ -308,16 +312,18 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     }
   }
 
-  protected synchronized void resetReReservations(Priority priority) {
-    reReservations.setCount(priority, 0);
+  protected synchronized void resetReReservations(
+      SchedulerRequestKey schedulerKey) {
+    reReservations.setCount(schedulerKey, 0);
   }
 
-  protected synchronized void addReReservation(Priority priority) {
-    reReservations.add(priority);
+  protected synchronized void addReReservation(
+      SchedulerRequestKey schedulerKey) {
+    reReservations.add(schedulerKey);
   }
 
-  public synchronized int getReReservations(Priority priority) {
-    return reReservations.count(priority);
+  public synchronized int getReReservations(SchedulerRequestKey schedulerKey) {
+    return reReservations.count(schedulerKey);
   }
 
   /**
@@ -366,7 +372,7 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
    */
   public synchronized List<RMContainer> getReservedContainers() {
     List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
-    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
+    for (Map.Entry<SchedulerRequestKey, Map<NodeId, RMContainer>> e :
       this.reservedContainers.entrySet()) {
       reservedContainers.addAll(e.getValue().values());
     }
@@ -374,8 +380,9 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   }
   
   public synchronized boolean reserveIncreasedContainer(SchedulerNode node,
-      Priority priority, RMContainer rmContainer, Resource reservedResource) {
-    if (commonReserve(node, priority, rmContainer, reservedResource)) {
+      SchedulerRequestKey schedulerKey, RMContainer rmContainer,
+      Resource reservedResource) {
+    if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) {
       attemptResourceUsage.incReserved(node.getPartition(),
           reservedResource);
       // succeeded
@@ -386,10 +393,11 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   }
   
   private synchronized boolean commonReserve(SchedulerNode node,
-      Priority priority, RMContainer rmContainer, Resource reservedResource) {
+      SchedulerRequestKey schedulerKey, RMContainer rmContainer,
+      Resource reservedResource) {
     try {
       rmContainer.handle(new RMContainerReservedEvent(rmContainer
-          .getContainerId(), reservedResource, node.getNodeID(), priority));
+          .getContainerId(), reservedResource, node.getNodeID(), 
schedulerKey));
     } catch (InvalidStateTransitionException e) {
       // We reach here could be caused by container already finished, return
       // false indicate it fails
@@ -397,10 +405,10 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     }
     
     Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
+        this.reservedContainers.get(schedulerKey);
     if (reservedContainers == null) {
       reservedContainers = new HashMap<NodeId, RMContainer>();
-      this.reservedContainers.put(priority, reservedContainers);
+      this.reservedContainers.put(schedulerKey, reservedContainers);
     }
     reservedContainers.put(node.getNodeID(), rmContainer);
 
@@ -408,7 +416,7 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
       LOG.debug("Application attempt " + getApplicationAttemptId()
           + " reserved container " + rmContainer + " on node " + node
           + ". This attempt currently has " + reservedContainers.size()
-          + " reserved containers at priority " + priority
+          + " reserved containers at priority " + schedulerKey.getPriority()
           + "; currentReservation " + reservedResource);
     }
     
@@ -416,7 +424,8 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   }
   
   public synchronized RMContainer reserve(SchedulerNode node,
-      Priority priority, RMContainer rmContainer, Container container) {
+      SchedulerRequestKey schedulerKey, RMContainer rmContainer,
+      Container container) {
     // Create RMContainer if necessary
     if (rmContainer == null) {
       rmContainer =
@@ -427,13 +436,13 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
       ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
 
       // Reset the re-reservation count
-      resetReReservations(priority);
+      resetReReservations(schedulerKey);
     } else {
       // Note down the re-reservation
-      addReReservation(priority);
+      addReReservation(schedulerKey);
     }
     
-    commonReserve(node, priority, rmContainer, container.getResource());
+    commonReserve(node, schedulerKey, rmContainer, container.getResource());
 
     return rmContainer;
   }
@@ -442,12 +451,13 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
    * Has the application reserved the given <code>node</code> at the
    * given <code>priority</code>?
    * @param node node to be checked
-   * @param priority priority of reserved container
+   * @param schedulerKey scheduler key  of reserved container
    * @return true is reserved, false if not
    */
-  public synchronized boolean isReserved(SchedulerNode node, Priority 
priority) {
+  public synchronized boolean isReserved(SchedulerNode node,
+      SchedulerRequestKey schedulerKey) {
     Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
+        this.reservedContainers.get(schedulerKey);
     if (reservedContainers != null) {
       return reservedContainers.containsKey(node.getNodeID());
     }
@@ -471,9 +481,10 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     return resourceLimit;
   }
   
-  public synchronized int getNumReservedContainers(Priority priority) {
+  public synchronized int getNumReservedContainers(
+      SchedulerRequestKey schedulerKey) {
     Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
+        this.reservedContainers.get(schedulerKey);
     return (reservedContainers == null) ? 0 : reservedContainers.size();
   }
   
@@ -495,8 +506,9 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   
   public synchronized void showRequests() {
     if (LOG.isDebugEnabled()) {
-      for (Priority priority : getPriorities()) {
-        Map<String, ResourceRequest> requests = getResourceRequests(priority);
+      for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
+        Map<String, ResourceRequest> requests =
+            getResourceRequests(schedulerKey);
         if (requests != null) {
           LOG.debug("showRequests:" + " application=" + getApplicationId()
               + " headRoom=" + getHeadroom() + " currentConsumption="
@@ -635,59 +647,66 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   }
 
   public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
-      Priority priority) {
-    missedNonPartitionedRequestSchedulingOpportunity.add(priority);
-    return missedNonPartitionedRequestSchedulingOpportunity.count(priority);
+      SchedulerRequestKey schedulerKey) {
+    missedNonPartitionedReqSchedulingOpportunity.add(schedulerKey);
+    return missedNonPartitionedReqSchedulingOpportunity.count(schedulerKey);
   }
 
   public synchronized void
-      resetMissedNonPartitionedRequestSchedulingOpportunity(Priority priority) 
{
-    missedNonPartitionedRequestSchedulingOpportunity.setCount(priority, 0);
+      resetMissedNonPartitionedRequestSchedulingOpportunity(
+      SchedulerRequestKey schedulerKey) {
+    missedNonPartitionedReqSchedulingOpportunity.setCount(schedulerKey, 0);
   }
 
   
-  public synchronized void addSchedulingOpportunity(Priority priority) {
-    int count = schedulingOpportunities.count(priority);
+  public synchronized void addSchedulingOpportunity(
+      SchedulerRequestKey schedulerKey) {
+    int count = schedulingOpportunities.count(schedulerKey);
     if (count < Integer.MAX_VALUE) {
-      schedulingOpportunities.setCount(priority, count + 1);
+      schedulingOpportunities.setCount(schedulerKey, count + 1);
     }
   }
   
-  public synchronized void subtractSchedulingOpportunity(Priority priority) {
-    int count = schedulingOpportunities.count(priority) - 1;
-    this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
+  public synchronized void subtractSchedulingOpportunity(
+      SchedulerRequestKey schedulerKey) {
+    int count = schedulingOpportunities.count(schedulerKey) - 1;
+    this.schedulingOpportunities.setCount(schedulerKey, Math.max(count,  0));
   }
 
   /**
    * Return the number of times the application has been given an opportunity
    * to schedule a task at the given priority since the last time it
    * successfully did so.
+   * @param schedulerKey Scheduler Key
+   * @return number of scheduling opportunities
    */
-  public synchronized int getSchedulingOpportunities(Priority priority) {
-    return schedulingOpportunities.count(priority);
+  public synchronized int getSchedulingOpportunities(
+      SchedulerRequestKey schedulerKey) {
+    return schedulingOpportunities.count(schedulerKey);
   }
   
   /**
-   * Should be called when an application has successfully scheduled a 
container,
-   * or when the scheduling locality threshold is relaxed.
+   * Should be called when an application has successfully scheduled a
+   * container, or when the scheduling locality threshold is relaxed.
    * Reset various internal counters which affect delay scheduling
    *
-   * @param priority The priority of the container scheduled.
+   * @param schedulerKey The priority of the container scheduled.
    */
-  public synchronized void resetSchedulingOpportunities(Priority priority) {
-    resetSchedulingOpportunities(priority, System.currentTimeMillis());
+  public synchronized void resetSchedulingOpportunities(
+      SchedulerRequestKey schedulerKey) {
+    resetSchedulingOpportunities(schedulerKey, System.currentTimeMillis());
   }
 
   // used for continuous scheduling
-  public synchronized void resetSchedulingOpportunities(Priority priority,
-      long currentTimeMs) {
-    lastScheduledContainer.put(priority, currentTimeMs);
-    schedulingOpportunities.setCount(priority, 0);
+  public synchronized void resetSchedulingOpportunities(
+      SchedulerRequestKey schedulerKey, long currentTimeMs) {
+    lastScheduledContainer.put(schedulerKey, currentTimeMs);
+    schedulingOpportunities.setCount(schedulerKey, 0);
   }
 
   @VisibleForTesting
-  void setSchedulingOpportunities(Priority priority, int count) {
-    schedulingOpportunities.setCount(priority, count);
+  void setSchedulingOpportunities(SchedulerRequestKey schedulerKey, int count) 
{
+    schedulingOpportunities.setCount(schedulerKey, count);
   }
 
   synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() 
{
@@ -747,7 +766,8 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     return this.resourceLimit;
   }
 
-  public synchronized Map<Priority, Long> getLastScheduledContainer() {
+  public synchronized Map<SchedulerRequestKey, Long>
+      getLastScheduledContainer() {
     return this.lastScheduledContainer;
   }
 
@@ -892,8 +912,8 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   }
   
   public synchronized boolean removeIncreaseRequest(NodeId nodeId,
-      Priority priority, ContainerId containerId) {
-    return appSchedulingInfo.removeIncreaseRequest(nodeId, priority,
+      SchedulerRequestKey schedulerKey, ContainerId containerId) {
+    return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
         containerId);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 1f57e07..2efdbd0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -31,7 +31,6 @@ import 
org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -308,11 +307,11 @@ public abstract class SchedulerNode {
   /**
    * Reserve container for the attempt on this node.
    * @param attempt Application attempt asking for the reservation.
-   * @param priority Priority of the reservation.
+   * @param schedulerKey Priority of the reservation.
    * @param container Container reserving resources for.
    */
   public abstract void reserveResource(SchedulerApplicationAttempt attempt,
-      Priority priority, RMContainer container);
+      SchedulerRequestKey schedulerKey, RMContainer container);
 
   /**
    * Unreserve resources on this node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
new file mode 100644
index 0000000..b4988be
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+
+/**
+ * Composite key for outstanding scheduler requests for any schedulable entity.
+ * Currently it includes {@link Priority}.
+ */
+public final class SchedulerRequestKey implements
+    Comparable<SchedulerRequestKey> {
+
+  private final Priority priority;
+
+  public static final SchedulerRequestKey UNDEFINED =
+      new SchedulerRequestKey(Priority.UNDEFINED);
+
+  /**
+   * Factory method to generate a SchedulerRequestKey from a ResourceRequest.
+   * @param req ResourceRequest
+   * @return SchedulerRequestKey
+   */
+  public static SchedulerRequestKey create(ResourceRequest req) {
+    return new SchedulerRequestKey(req.getPriority());
+  }
+
+  /**
+   * Convenience method to extract the SchedulerRequestKey used to schedule the
+   * Container.
+   * @param container Container
+   * @return SchedulerRequestKey
+   */
+  public static SchedulerRequestKey extractFrom(Container container) {
+    return new SchedulerRequestKey(container.getPriority());
+  }
+
+  private SchedulerRequestKey(Priority priority) {
+    this.priority = priority;
+  }
+
+  /**
+   * Get the {@link Priority} of the request.
+   *
+   * @return the {@link Priority} of the request
+   */
+  public Priority getPriority() {
+    return priority;
+  }
+
+  @Override
+  public int compareTo(SchedulerRequestKey o) {
+    if (o == null) {
+      return (priority != null) ? -1 : 0;
+    } else {
+      if (priority == null) {
+        return 1;
+      }
+    }
+    return o.getPriority().compareTo(priority);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof SchedulerRequestKey)) {
+      return false;
+    }
+
+    SchedulerRequestKey that = (SchedulerRequestKey) o;
+    return getPriority().equals(that.getPriority());
+
+  }
+
+  @Override
+  public int hashCode() {
+    return getPriority().hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 6dcafec..9aae909 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1265,7 +1265,8 @@ public class LeafQueue extends AbstractCSQueue {
       }
 
       if (null != priority) {
-        removed = app.unreserve(rmContainer.getContainer().getPriority(), node,
+        removed = app.unreserve(
+            rmContainer.getAllocatedSchedulerKey(), node,
             rmContainer);
       }
 
@@ -1321,7 +1322,7 @@ public class LeafQueue extends AbstractCSQueue {
       
       // Remove container increase request if it exists
       application.removeIncreaseRequest(node.getNodeID(),
-          rmContainer.getAllocatedPriority(), rmContainer.getContainerId());
+          rmContainer.getAllocatedSchedulerKey(), 
rmContainer.getContainerId());
 
       boolean removed = false;
 
@@ -1335,7 +1336,7 @@ public class LeafQueue extends AbstractCSQueue {
         // happen under scheduler's lock... 
         // So, this is, in effect, a transaction across application & node
         if (rmContainer.getState() == RMContainerState.RESERVED) {
-          removed = application.unreserve(rmContainer.getReservedPriority(),
+          removed = 
application.unreserve(rmContainer.getReservedSchedulerKey(),
               node, rmContainer);
         } else {
           removed =
@@ -1785,7 +1786,8 @@ public class LeafQueue extends AbstractCSQueue {
       // Do we have increase request for the same container? If so, remove it
       boolean hasIncreaseRequest =
           app.removeIncreaseRequest(decreaseRequest.getNodeId(),
-              decreaseRequest.getPriority(), decreaseRequest.getContainerId());
+              decreaseRequest.getRMContainer().getAllocatedSchedulerKey(),
+              decreaseRequest.getContainerId());
       if (hasIncreaseRequest) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("While processing decrease requests, found an increase"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
index 25e5824..4a2ae18 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -37,6 +36,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -115,7 +116,8 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
         node.getUnallocatedResource())) {
       // OK, we can allocate this increase request
       // Unreserve it first
-      application.unreserve(increaseRequest.getPriority(),
+      application.unreserve(
+          increaseRequest.getRMContainer().getAllocatedSchedulerKey(),
           (FiCaSchedulerNode) node, increaseRequest.getRMContainer());
       
       // Notify application
@@ -152,7 +154,8 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
       return createSuccessfullyIncreasedCSAssignment(increaseRequest, false);
     } else {
       boolean reservationSucceeded =
-          application.reserveIncreasedContainer(increaseRequest.getPriority(),
+          application.reserveIncreasedContainer(
+              increaseRequest.getRMContainer().getAllocatedSchedulerKey(),
               node, increaseRequest.getRMContainer(),
               increaseRequest.getDeltaCapacity());
       
@@ -228,11 +231,11 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
        * priority, but will skip increase request and move to next increase
        * request if queue-limit or user-limit aren't satisfied 
        */
-      for (Priority priority : application.getPriorities()) {
+      for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Looking at increase request for application="
               + application.getApplicationAttemptId() + " priority="
-              + priority);
+              + schedulerKey.getPriority());
         }
 
         /*
@@ -242,14 +245,14 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
          * cannot be allocated.
          */
         Map<ContainerId, SchedContainerChangeRequest> increaseRequestMap =
-            sinfo.getIncreaseRequests(nodeId, priority);
+            sinfo.getIncreaseRequests(nodeId, schedulerKey);
 
         // We don't have more increase request on this priority, skip..
         if (null == increaseRequestMap) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("There's no increase request for "
                 + application.getApplicationAttemptId() + " priority="
-                + priority);
+                + schedulerKey.getPriority());
           }
           continue;
         }
@@ -318,7 +321,8 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
         // Remove invalid in request requests
         if (!toBeRemovedRequests.isEmpty()) {
           for (SchedContainerChangeRequest req : toBeRemovedRequests) {
-            sinfo.removeIncreaseRequest(req.getNodeId(), req.getPriority(),
+            sinfo.removeIncreaseRequest(req.getNodeId(),
+                req.getRMContainer().getAllocatedSchedulerKey(),
                 req.getContainerId());
           }
         }
@@ -337,8 +341,9 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
       
       // We already reserved this increase container
       SchedContainerChangeRequest request =
-          sinfo.getIncreaseRequest(nodeId, reservedContainer.getContainer()
-              .getPriority(), reservedContainer.getContainerId());
+          sinfo.getIncreaseRequest(nodeId,
+              reservedContainer.getAllocatedSchedulerKey(),
+              reservedContainer.getContainerId());
       
       // We will cancel the reservation any of following happens
       // - Container finished


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to