YARN-5392. Replace use of Priority in the Scheduling infrastructure with an opaque ShedulerRequestKey. (asuresh and subru)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5aace38b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5aace38b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5aace38b Branch: refs/heads/HADOOP-12756 Commit: 5aace38b748ba71aaadd2c4d64eba8dc1f816828 Parents: d2cf8b5 Author: Arun Suresh <asur...@apache.org> Authored: Thu Jul 21 20:57:44 2016 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Tue Jul 26 14:54:03 2016 -0700 ---------------------------------------------------------------------- .../capacity/FifoCandidatesSelector.java | 11 +- .../resourcemanager/resource/Priority.java | 10 +- .../rmcontainer/RMContainer.java | 7 +- .../rmcontainer/RMContainerImpl.java | 19 +- .../rmcontainer/RMContainerReservedEvent.java | 12 +- .../scheduler/AppSchedulingInfo.java | 160 +++++++------- .../scheduler/SchedulerApplicationAttempt.java | 160 ++++++++------ .../scheduler/SchedulerNode.java | 5 +- .../scheduler/SchedulerRequestKey.java | 99 +++++++++ .../scheduler/capacity/LeafQueue.java | 10 +- .../allocator/IncreaseContainerAllocator.java | 25 ++- .../allocator/RegularContainerAllocator.java | 181 ++++++++-------- .../scheduler/common/fica/FiCaSchedulerApp.java | 56 ++--- .../common/fica/FiCaSchedulerNode.java | 5 +- .../scheduler/fair/FSAppAttempt.java | 213 +++++++++++-------- .../scheduler/fair/FSSchedulerNode.java | 5 +- .../scheduler/fair/FairScheduler.java | 2 +- .../scheduler/fifo/FifoScheduler.java | 87 ++++---- .../server/resourcemanager/Application.java | 120 ++++++----- .../yarn/server/resourcemanager/Task.java | 11 +- ...alCapacityPreemptionPolicyMockFramework.java | 4 + ...estProportionalCapacityPreemptionPolicy.java | 4 + .../rmcontainer/TestRMContainerImpl.java | 6 +- .../TestSchedulerApplicationAttempt.java | 31 +-- .../capacity/TestCapacityScheduler.java | 10 +- .../scheduler/capacity/TestLeafQueue.java | 170 ++++++++------- .../TestNodeLabelContainerAllocation.java | 4 +- .../scheduler/capacity/TestReservations.java | 52 +++-- .../scheduler/capacity/TestUtils.java | 12 ++ .../scheduler/fair/TestFSAppAttempt.java | 18 +- .../scheduler/fair/TestFairScheduler.java | 15 +- .../fair/TestFairSchedulerPreemption.java | 24 ++- 32 files changed, 925 insertions(+), 623 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index a8c62fd..9df395d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -343,12 +342,10 @@ public class FifoCandidatesSelector Collections.sort(containers, new Comparator<RMContainer>() { @Override public int compare(RMContainer a, RMContainer b) { - Comparator<Priority> c = new org.apache.hadoop.yarn.server - .resourcemanager.resource.Priority.Comparator(); - int priorityComp = c.compare(b.getContainer().getPriority(), - a.getContainer().getPriority()); - if (priorityComp != 0) { - return priorityComp; + int schedKeyComp = b.getAllocatedSchedulerKey() + .compareTo(a.getAllocatedSchedulerKey()); + if (schedKeyComp != 0) { + return schedKeyComp; } return b.getContainerId().compareTo(a.getContainerId()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java index 5060c4c..f098806 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Priority.java @@ -27,13 +27,5 @@ public class Priority { priority.setPriority(prio); return priority; } - - public static class Comparator - implements java.util.Comparator<org.apache.hadoop.yarn.api.records.Priority> { - @Override - public int compare(org.apache.hadoop.yarn.api.records.Priority o1, org.apache.hadoop.yarn.api.records.Priority o2) { - return o1.getPriority() - o2.getPriority(); - } - } - + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 504c973..e5d1208 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -31,7 +31,8 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; + /** * Represents the ResourceManager's view of an application container. See @@ -55,7 +56,7 @@ public interface RMContainer extends EventHandler<RMContainerEvent> { NodeId getReservedNode(); - Priority getReservedPriority(); + SchedulerRequestKey getReservedSchedulerKey(); Resource getAllocatedResource(); @@ -63,6 +64,8 @@ public interface RMContainer extends EventHandler<RMContainerEvent> { NodeId getAllocatedNode(); + SchedulerRequestKey getAllocatedSchedulerKey(); + Priority getAllocatedPriority(); long getCreationTime(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index ed819a0..706821e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -53,12 +53,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode .RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -173,7 +173,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { private Resource reservedResource; private NodeId reservedNode; - private Priority reservedPriority; + private SchedulerRequestKey reservedSchedulerKey; private long creationTime; private long finishTime; private ContainerStatus finishedStatus; @@ -187,6 +187,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { private volatile String queueName; private boolean isExternallyAllocated; + private SchedulerRequestKey allocatedSchedulerKey; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -226,6 +227,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { this.containerId = container.getId(); this.nodeId = nodeId; this.container = container; + this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container); this.appAttemptId = appAttemptId; this.user = user; this.creationTime = creationTime; @@ -296,8 +298,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { } @Override - public Priority getReservedPriority() { - return reservedPriority; + public SchedulerRequestKey getReservedSchedulerKey() { + return reservedSchedulerKey; } @Override @@ -326,6 +328,11 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { } @Override + public SchedulerRequestKey getAllocatedSchedulerKey() { + return allocatedSchedulerKey; + } + + @Override public Priority getAllocatedPriority() { return container.getPriority(); } @@ -535,7 +542,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { RMContainerReservedEvent e = (RMContainerReservedEvent)event; container.reservedResource = e.getReservedResource(); container.reservedNode = e.getReservedNode(); - container.reservedPriority = e.getReservedPriority(); + container.reservedSchedulerKey = e.getReservedSchedulerKey(); if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED) .contains(container.getState())) { @@ -768,7 +775,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { try { containerReport = ContainerReport.newInstance(this.getContainerId(), this.getAllocatedResource(), this.getAllocatedNode(), - this.getAllocatedPriority(), this.getCreationTime(), + this.getAllocatedSchedulerKey().getPriority(), this.getCreationTime(), this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(), this.getContainerExitStatus(), this.getContainerState(), this.getNodeHttpAddress()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java index 74e2dc4..d7d1e94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java @@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; /** * The event signifying that a container has been reserved. @@ -33,15 +33,15 @@ public class RMContainerReservedEvent extends RMContainerEvent { private final Resource reservedResource; private final NodeId reservedNode; - private final Priority reservedPriority; + private final SchedulerRequestKey reservedSchedulerKey; public RMContainerReservedEvent(ContainerId containerId, Resource reservedResource, NodeId reservedNode, - Priority reservedPriority) { + SchedulerRequestKey reservedSchedulerKey) { super(containerId, RMContainerEventType.RESERVED); this.reservedResource = reservedResource; this.reservedNode = reservedNode; - this.reservedPriority = reservedPriority; + this.reservedSchedulerKey = reservedSchedulerKey; } public Resource getReservedResource() { @@ -52,8 +52,8 @@ public class RMContainerReservedEvent extends RMContainerEvent { return reservedNode; } - public Priority getReservedPriority() { - return reservedPriority; + public SchedulerRequestKey getReservedSchedulerKey() { + return reservedSchedulerKey; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8d42c97..3764664 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -41,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -60,8 +58,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class AppSchedulingInfo { private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); - private static final Comparator<Priority> COMPARATOR = - new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator(); private static final int EPOCH_BIT_SHIFT = 40; private final ApplicationId applicationId; @@ -82,10 +78,10 @@ public class AppSchedulingInfo { private Set<String> requestedPartitions = new HashSet<>(); - final Set<Priority> priorities = new TreeSet<>(COMPARATOR); - final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap = - new ConcurrentHashMap<>(); - final Map<NodeId, Map<Priority, Map<ContainerId, + final Set<SchedulerRequestKey> schedulerKeys = new TreeSet<>(); + final Map<SchedulerRequestKey, Map<String, ResourceRequest>> + resourceRequestMap = new ConcurrentHashMap<>(); + final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>> containerIncreaseRequestMap = new ConcurrentHashMap<>(); @@ -134,22 +130,23 @@ public class AppSchedulingInfo { * Clear any pending requests from this application. */ private synchronized void clearRequests() { - priorities.clear(); + schedulerKeys.clear(); resourceRequestMap.clear(); LOG.info("Application " + applicationId + " requests cleared"); } public synchronized boolean hasIncreaseRequest(NodeId nodeId) { - Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = - containerIncreaseRequestMap.get(nodeId); + Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); return requestsOnNode == null ? false : requestsOnNode.size() > 0; } - + public synchronized Map<ContainerId, SchedContainerChangeRequest> - getIncreaseRequests(NodeId nodeId, Priority priority) { - Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = - containerIncreaseRequestMap.get(nodeId); - return requestsOnNode == null ? null : requestsOnNode.get(priority); + getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) { + Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); + return requestsOnNode == null ? null : requestsOnNode.get( + schedulerKey); } /** @@ -175,15 +172,17 @@ public class AppSchedulingInfo { } NodeId nodeId = r.getRMContainer().getAllocatedNode(); - Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = - containerIncreaseRequestMap.get(nodeId); + Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); if (null == requestsOnNode) { requestsOnNode = new TreeMap<>(); containerIncreaseRequestMap.put(nodeId, requestsOnNode); } SchedContainerChangeRequest prevChangeRequest = - getIncreaseRequest(nodeId, r.getPriority(), r.getContainerId()); + getIncreaseRequest(nodeId, + r.getRMContainer().getAllocatedSchedulerKey(), + r.getContainerId()); if (null != prevChangeRequest) { if (Resources.equals(prevChangeRequest.getTargetCapacity(), r.getTargetCapacity())) { @@ -192,7 +191,8 @@ public class AppSchedulingInfo { } // remove the old one, as we will use the new one going forward - removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(), + removeIncreaseRequest(nodeId, + prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(), prevChangeRequest.getContainerId()); } @@ -219,21 +219,22 @@ public class AppSchedulingInfo { */ private void insertIncreaseRequest(SchedContainerChangeRequest request) { NodeId nodeId = request.getNodeId(); - Priority priority = request.getPriority(); + SchedulerRequestKey schedulerKey = + request.getRMContainer().getAllocatedSchedulerKey(); ContainerId containerId = request.getContainerId(); - Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = - containerIncreaseRequestMap.get(nodeId); + Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); if (null == requestsOnNode) { requestsOnNode = new HashMap<>(); containerIncreaseRequestMap.put(nodeId, requestsOnNode); } Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = - requestsOnNode.get(priority); + requestsOnNode.get(schedulerKey); if (null == requestsOnNodeWithPriority) { requestsOnNodeWithPriority = new TreeMap<>(); - requestsOnNode.put(priority, requestsOnNodeWithPriority); + requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority); } requestsOnNodeWithPriority.put(containerId, request); @@ -249,20 +250,20 @@ public class AppSchedulingInfo { + " delta=" + delta); } - // update priorities - priorities.add(priority); + // update Scheduler Keys + schedulerKeys.add(schedulerKey); } - public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority, - ContainerId containerId) { - Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = - containerIncreaseRequestMap.get(nodeId); + public synchronized boolean removeIncreaseRequest(NodeId nodeId, + SchedulerRequestKey schedulerKey, ContainerId containerId) { + Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); if (null == requestsOnNode) { return false; } Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = - requestsOnNode.get(priority); + requestsOnNode.get(schedulerKey); if (null == requestsOnNodeWithPriority) { return false; } @@ -272,7 +273,7 @@ public class AppSchedulingInfo { // remove hierarchies if it becomes empty if (requestsOnNodeWithPriority.isEmpty()) { - requestsOnNode.remove(priority); + requestsOnNode.remove(schedulerKey); } if (requestsOnNode.isEmpty()) { containerIncreaseRequestMap.remove(nodeId); @@ -296,15 +297,15 @@ public class AppSchedulingInfo { } public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, - Priority priority, ContainerId containerId) { - Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = - containerIncreaseRequestMap.get(nodeId); + SchedulerRequestKey schedulerKey, ContainerId containerId) { + Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); if (null == requestsOnNode) { return null; } Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = - requestsOnNode.get(priority); + requestsOnNode.get(schedulerKey); return requestsOnNodeWithPriority == null ? null : requestsOnNodeWithPriority.get(containerId); } @@ -328,17 +329,18 @@ public class AppSchedulingInfo { // Update resource requests for (ResourceRequest request : requests) { - Priority priority = request.getPriority(); + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); String resourceName = request.getResourceName(); // Update node labels if required updateNodeLabels(request); - Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority); + Map<String, ResourceRequest> asks = + this.resourceRequestMap.get(schedulerKey); if (asks == null) { asks = new ConcurrentHashMap<>(); - this.resourceRequestMap.put(priority, asks); - this.priorities.add(priority); + this.resourceRequestMap.put(schedulerKey, asks); + this.schedulerKeys.add(schedulerKey); } // Increment number of containers if recovering preempted resources @@ -405,11 +407,11 @@ public class AppSchedulingInfo { } private void updateNodeLabels(ResourceRequest request) { - Priority priority = request.getPriority(); + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); String resourceName = request.getResourceName(); if (resourceName.equals(ResourceRequest.ANY)) { ResourceRequest previousAnyRequest = - getResourceRequest(priority, resourceName); + getResourceRequest(schedulerKey, resourceName); // When there is change in ANY request label expression, we should // update label for all resource requests already added of same @@ -417,7 +419,7 @@ public class AppSchedulingInfo { if ((null == previousAnyRequest) || hasRequestLabelChanged(previousAnyRequest, request)) { Map<String, ResourceRequest> resourceRequest = - getResourceRequests(priority); + getResourceRequests(schedulerKey); if (resourceRequest != null) { for (ResourceRequest r : resourceRequest.values()) { if (!r.getResourceName().equals(ResourceRequest.ANY)) { @@ -428,7 +430,7 @@ public class AppSchedulingInfo { } } else { ResourceRequest anyRequest = - getResourceRequest(priority, ResourceRequest.ANY); + getResourceRequest(schedulerKey, ResourceRequest.ANY); if (anyRequest != null) { request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); } @@ -501,13 +503,13 @@ public class AppSchedulingInfo { return userBlacklistChanged.getAndSet(false); } - public synchronized Collection<Priority> getPriorities() { - return priorities; + public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() { + return schedulerKeys; } public synchronized Map<String, ResourceRequest> getResourceRequests( - Priority priority) { - return resourceRequestMap.get(priority); + SchedulerRequestKey schedulerKey) { + return resourceRequestMap.get(schedulerKey); } public synchronized List<ResourceRequest> getAllResourceRequests() { @@ -518,14 +520,16 @@ public class AppSchedulingInfo { return ret; } - public synchronized ResourceRequest getResourceRequest(Priority priority, - String resourceName) { - Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority); + public synchronized ResourceRequest getResourceRequest( + SchedulerRequestKey schedulerKey, String resourceName) { + Map<String, ResourceRequest> nodeRequests = + resourceRequestMap.get(schedulerKey); return (nodeRequests == null) ? null : nodeRequests.get(resourceName); } - public synchronized Resource getResource(Priority priority) { - ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); + public synchronized Resource getResource(SchedulerRequestKey schedulerKey) { + ResourceRequest request = + getResourceRequest(schedulerKey, ResourceRequest.ANY); return (request == null) ? null : request.getCapability(); } @@ -555,7 +559,8 @@ public class AppSchedulingInfo { public synchronized void increaseContainer( SchedContainerChangeRequest increaseRequest) { NodeId nodeId = increaseRequest.getNodeId(); - Priority priority = increaseRequest.getPriority(); + SchedulerRequestKey schedulerKey = + increaseRequest.getRMContainer().getAllocatedSchedulerKey(); ContainerId containerId = increaseRequest.getContainerId(); Resource deltaCapacity = increaseRequest.getDeltaCapacity(); @@ -568,7 +573,7 @@ public class AppSchedulingInfo { // Set queue metrics queue.getMetrics().allocateResources(user, deltaCapacity); // remove the increase request from pending increase request map - removeIncreaseRequest(nodeId, priority, containerId); + removeIncreaseRequest(nodeId, schedulerKey, containerId); // update usage appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity); } @@ -591,19 +596,25 @@ public class AppSchedulingInfo { // update usage appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta); } - + /** * Resources have been allocated to this application by the resource * scheduler. Track them. + * @param type Node Type + * @param node SchedulerNode + * @param schedulerKey SchedulerRequestKey + * @param request ResourceRequest + * @param containerAllocated Container Allocated + * @return List of ResourceRequests */ public synchronized List<ResourceRequest> allocate(NodeType type, - SchedulerNode node, Priority priority, ResourceRequest request, - Container containerAllocated) { + SchedulerNode node, SchedulerRequestKey schedulerKey, + ResourceRequest request, Container containerAllocated) { List<ResourceRequest> resourceRequests = new ArrayList<>(); if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, priority, request, resourceRequests); + allocateNodeLocal(node, schedulerKey, request, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, priority, request, resourceRequests); + allocateRackLocal(node, schedulerKey, request, resourceRequests); } else { allocateOffSwitch(request, resourceRequests); } @@ -633,16 +644,16 @@ public class AppSchedulingInfo { * application. */ private synchronized void allocateNodeLocal(SchedulerNode node, - Priority priority, ResourceRequest nodeLocalRequest, + SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest, List<ResourceRequest> resourceRequests) { // Update future requirements - decResourceRequest(node.getNodeName(), priority, nodeLocalRequest); + decResourceRequest(node.getNodeName(), schedulerKey, nodeLocalRequest); - ResourceRequest rackLocalRequest = resourceRequestMap.get(priority).get( + ResourceRequest rackLocalRequest = resourceRequestMap.get(schedulerKey).get( node.getRackName()); - decResourceRequest(node.getRackName(), priority, rackLocalRequest); + decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest); - ResourceRequest offRackRequest = resourceRequestMap.get(priority).get( + ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( ResourceRequest.ANY); decrementOutstanding(offRackRequest); @@ -652,11 +663,11 @@ public class AppSchedulingInfo { resourceRequests.add(cloneResourceRequest(offRackRequest)); } - private void decResourceRequest(String resourceName, Priority priority, - ResourceRequest request) { + private void decResourceRequest(String resourceName, + SchedulerRequestKey schedulerKey, ResourceRequest request) { request.setNumContainers(request.getNumContainers() - 1); if (request.getNumContainers() == 0) { - resourceRequestMap.get(priority).remove(resourceName); + resourceRequestMap.get(schedulerKey).remove(resourceName); } } @@ -665,12 +676,12 @@ public class AppSchedulingInfo { * application. */ private synchronized void allocateRackLocal(SchedulerNode node, - Priority priority, ResourceRequest rackLocalRequest, + SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest, List<ResourceRequest> resourceRequests) { // Update future requirements - decResourceRequest(node.getRackName(), priority, rackLocalRequest); + decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest); - ResourceRequest offRackRequest = resourceRequestMap.get(priority).get( + ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( ResourceRequest.ANY); decrementOutstanding(offRackRequest); @@ -712,8 +723,9 @@ public class AppSchedulingInfo { private synchronized void checkForDeactivation() { boolean deactivate = true; - for (Priority priority : getPriorities()) { - ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); + for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { + ResourceRequest request = + getResourceRequest(schedulerKey, ResourceRequest.ANY); if (request != null) { if (request.getNumContainers() > 0) { deactivate = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b4a2639..c4b32a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -98,10 +98,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { protected ApplicationAttemptId attemptId; protected Map<ContainerId, RMContainer> liveContainers = new HashMap<ContainerId, RMContainer>(); - protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = - new HashMap<Priority, Map<NodeId, RMContainer>>(); + protected final Map<SchedulerRequestKey, Map<NodeId, RMContainer>> + reservedContainers = new HashMap<>(); - private final Multiset<Priority> reReservations = HashMultiset.create(); + private final Multiset<SchedulerRequestKey> reReservations = + HashMultiset.create(); private Resource resourceLimit = Resource.newInstance(0, 0); private boolean unmanagedAM = true; @@ -137,7 +138,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { * the application successfully schedules a task (at rack or node local), it * is reset to 0. */ - Multiset<Priority> schedulingOpportunities = HashMultiset.create(); + Multiset<SchedulerRequestKey> schedulingOpportunities = HashMultiset.create(); /** * Count how many times the application has been given an opportunity to @@ -146,12 +147,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { * incremented, and each time the application successfully schedules a task, * it is reset to 0 when schedule any task at corresponding priority. */ - Multiset<Priority> missedNonPartitionedRequestSchedulingOpportunity = + Multiset<SchedulerRequestKey> missedNonPartitionedReqSchedulingOpportunity = HashMultiset.create(); // Time of the last container scheduled at the current allowed level - protected Map<Priority, Long> lastScheduledContainer = - new HashMap<Priority, Long>(); + protected Map<SchedulerRequestKey, Long> lastScheduledContainer = + new HashMap<>(); protected Queue queue; protected boolean isStopped = false; @@ -225,8 +226,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return appSchedulingInfo.getUser(); } - public Map<String, ResourceRequest> getResourceRequests(Priority priority) { - return appSchedulingInfo.getResourceRequests(priority); + public Map<String, ResourceRequest> getResourceRequests( + SchedulerRequestKey schedulerKey) { + return appSchedulingInfo.getResourceRequests(schedulerKey); } public Set<ContainerId> getPendingRelease() { @@ -237,22 +239,24 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return appSchedulingInfo.getNewContainerId(); } - public Collection<Priority> getPriorities() { - return appSchedulingInfo.getPriorities(); + public Collection<SchedulerRequestKey> getSchedulerKeys() { + return appSchedulingInfo.getSchedulerKeys(); } - public synchronized ResourceRequest getResourceRequest(Priority priority, - String resourceName) { - return this.appSchedulingInfo.getResourceRequest(priority, resourceName); + public synchronized ResourceRequest getResourceRequest( + SchedulerRequestKey schedulerKey, String resourceName) { + return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName); } - public synchronized int getTotalRequiredResources(Priority priority) { - ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); + public synchronized int getTotalRequiredResources( + SchedulerRequestKey schedulerKey) { + ResourceRequest request = + getResourceRequest(schedulerKey, ResourceRequest.ANY); return request == null ? 0 : request.getNumContainers(); } - public synchronized Resource getResource(Priority priority) { - return appSchedulingInfo.getResource(priority); + public synchronized Resource getResource(SchedulerRequestKey schedulerKey) { + return appSchedulingInfo.getResource(schedulerKey); } public String getQueueName() { @@ -308,16 +312,18 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } } - protected synchronized void resetReReservations(Priority priority) { - reReservations.setCount(priority, 0); + protected synchronized void resetReReservations( + SchedulerRequestKey schedulerKey) { + reReservations.setCount(schedulerKey, 0); } - protected synchronized void addReReservation(Priority priority) { - reReservations.add(priority); + protected synchronized void addReReservation( + SchedulerRequestKey schedulerKey) { + reReservations.add(schedulerKey); } - public synchronized int getReReservations(Priority priority) { - return reReservations.count(priority); + public synchronized int getReReservations(SchedulerRequestKey schedulerKey) { + return reReservations.count(schedulerKey); } /** @@ -366,7 +372,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { */ public synchronized List<RMContainer> getReservedContainers() { List<RMContainer> reservedContainers = new ArrayList<RMContainer>(); - for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : + for (Map.Entry<SchedulerRequestKey, Map<NodeId, RMContainer>> e : this.reservedContainers.entrySet()) { reservedContainers.addAll(e.getValue().values()); } @@ -374,8 +380,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } public synchronized boolean reserveIncreasedContainer(SchedulerNode node, - Priority priority, RMContainer rmContainer, Resource reservedResource) { - if (commonReserve(node, priority, rmContainer, reservedResource)) { + SchedulerRequestKey schedulerKey, RMContainer rmContainer, + Resource reservedResource) { + if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) { attemptResourceUsage.incReserved(node.getPartition(), reservedResource); // succeeded @@ -386,10 +393,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } private synchronized boolean commonReserve(SchedulerNode node, - Priority priority, RMContainer rmContainer, Resource reservedResource) { + SchedulerRequestKey schedulerKey, RMContainer rmContainer, + Resource reservedResource) { try { rmContainer.handle(new RMContainerReservedEvent(rmContainer - .getContainerId(), reservedResource, node.getNodeID(), priority)); + .getContainerId(), reservedResource, node.getNodeID(), schedulerKey)); } catch (InvalidStateTransitionException e) { // We reach here could be caused by container already finished, return // false indicate it fails @@ -397,10 +405,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } Map<NodeId, RMContainer> reservedContainers = - this.reservedContainers.get(priority); + this.reservedContainers.get(schedulerKey); if (reservedContainers == null) { reservedContainers = new HashMap<NodeId, RMContainer>(); - this.reservedContainers.put(priority, reservedContainers); + this.reservedContainers.put(schedulerKey, reservedContainers); } reservedContainers.put(node.getNodeID(), rmContainer); @@ -408,7 +416,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { LOG.debug("Application attempt " + getApplicationAttemptId() + " reserved container " + rmContainer + " on node " + node + ". This attempt currently has " + reservedContainers.size() - + " reserved containers at priority " + priority + + " reserved containers at priority " + schedulerKey.getPriority() + "; currentReservation " + reservedResource); } @@ -416,7 +424,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } public synchronized RMContainer reserve(SchedulerNode node, - Priority priority, RMContainer rmContainer, Container container) { + SchedulerRequestKey schedulerKey, RMContainer rmContainer, + Container container) { // Create RMContainer if necessary if (rmContainer == null) { rmContainer = @@ -427,13 +436,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); // Reset the re-reservation count - resetReReservations(priority); + resetReReservations(schedulerKey); } else { // Note down the re-reservation - addReReservation(priority); + addReReservation(schedulerKey); } - commonReserve(node, priority, rmContainer, container.getResource()); + commonReserve(node, schedulerKey, rmContainer, container.getResource()); return rmContainer; } @@ -442,12 +451,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { * Has the application reserved the given <code>node</code> at the * given <code>priority</code>? * @param node node to be checked - * @param priority priority of reserved container + * @param schedulerKey scheduler key of reserved container * @return true is reserved, false if not */ - public synchronized boolean isReserved(SchedulerNode node, Priority priority) { + public synchronized boolean isReserved(SchedulerNode node, + SchedulerRequestKey schedulerKey) { Map<NodeId, RMContainer> reservedContainers = - this.reservedContainers.get(priority); + this.reservedContainers.get(schedulerKey); if (reservedContainers != null) { return reservedContainers.containsKey(node.getNodeID()); } @@ -471,9 +481,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return resourceLimit; } - public synchronized int getNumReservedContainers(Priority priority) { + public synchronized int getNumReservedContainers( + SchedulerRequestKey schedulerKey) { Map<NodeId, RMContainer> reservedContainers = - this.reservedContainers.get(priority); + this.reservedContainers.get(schedulerKey); return (reservedContainers == null) ? 0 : reservedContainers.size(); } @@ -495,8 +506,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public synchronized void showRequests() { if (LOG.isDebugEnabled()) { - for (Priority priority : getPriorities()) { - Map<String, ResourceRequest> requests = getResourceRequests(priority); + for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { + Map<String, ResourceRequest> requests = + getResourceRequests(schedulerKey); if (requests != null) { LOG.debug("showRequests:" + " application=" + getApplicationId() + " headRoom=" + getHeadroom() + " currentConsumption=" @@ -635,59 +647,66 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity( - Priority priority) { - missedNonPartitionedRequestSchedulingOpportunity.add(priority); - return missedNonPartitionedRequestSchedulingOpportunity.count(priority); + SchedulerRequestKey schedulerKey) { + missedNonPartitionedReqSchedulingOpportunity.add(schedulerKey); + return missedNonPartitionedReqSchedulingOpportunity.count(schedulerKey); } public synchronized void - resetMissedNonPartitionedRequestSchedulingOpportunity(Priority priority) { - missedNonPartitionedRequestSchedulingOpportunity.setCount(priority, 0); + resetMissedNonPartitionedRequestSchedulingOpportunity( + SchedulerRequestKey schedulerKey) { + missedNonPartitionedReqSchedulingOpportunity.setCount(schedulerKey, 0); } - public synchronized void addSchedulingOpportunity(Priority priority) { - int count = schedulingOpportunities.count(priority); + public synchronized void addSchedulingOpportunity( + SchedulerRequestKey schedulerKey) { + int count = schedulingOpportunities.count(schedulerKey); if (count < Integer.MAX_VALUE) { - schedulingOpportunities.setCount(priority, count + 1); + schedulingOpportunities.setCount(schedulerKey, count + 1); } } - public synchronized void subtractSchedulingOpportunity(Priority priority) { - int count = schedulingOpportunities.count(priority) - 1; - this.schedulingOpportunities.setCount(priority, Math.max(count, 0)); + public synchronized void subtractSchedulingOpportunity( + SchedulerRequestKey schedulerKey) { + int count = schedulingOpportunities.count(schedulerKey) - 1; + this.schedulingOpportunities.setCount(schedulerKey, Math.max(count, 0)); } /** * Return the number of times the application has been given an opportunity * to schedule a task at the given priority since the last time it * successfully did so. + * @param schedulerKey Scheduler Key + * @return number of scheduling opportunities */ - public synchronized int getSchedulingOpportunities(Priority priority) { - return schedulingOpportunities.count(priority); + public synchronized int getSchedulingOpportunities( + SchedulerRequestKey schedulerKey) { + return schedulingOpportunities.count(schedulerKey); } /** - * Should be called when an application has successfully scheduled a container, - * or when the scheduling locality threshold is relaxed. + * Should be called when an application has successfully scheduled a + * container, or when the scheduling locality threshold is relaxed. * Reset various internal counters which affect delay scheduling * - * @param priority The priority of the container scheduled. + * @param schedulerKey The priority of the container scheduled. */ - public synchronized void resetSchedulingOpportunities(Priority priority) { - resetSchedulingOpportunities(priority, System.currentTimeMillis()); + public synchronized void resetSchedulingOpportunities( + SchedulerRequestKey schedulerKey) { + resetSchedulingOpportunities(schedulerKey, System.currentTimeMillis()); } // used for continuous scheduling - public synchronized void resetSchedulingOpportunities(Priority priority, - long currentTimeMs) { - lastScheduledContainer.put(priority, currentTimeMs); - schedulingOpportunities.setCount(priority, 0); + public synchronized void resetSchedulingOpportunities( + SchedulerRequestKey schedulerKey, long currentTimeMs) { + lastScheduledContainer.put(schedulerKey, currentTimeMs); + schedulingOpportunities.setCount(schedulerKey, 0); } @VisibleForTesting - void setSchedulingOpportunities(Priority priority, int count) { - schedulingOpportunities.setCount(priority, count); + void setSchedulingOpportunities(SchedulerRequestKey schedulerKey, int count) { + schedulingOpportunities.setCount(schedulerKey, count); } synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { @@ -747,7 +766,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return this.resourceLimit; } - public synchronized Map<Priority, Long> getLastScheduledContainer() { + public synchronized Map<SchedulerRequestKey, Long> + getLastScheduledContainer() { return this.lastScheduledContainer; } @@ -892,8 +912,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } public synchronized boolean removeIncreaseRequest(NodeId nodeId, - Priority priority, ContainerId containerId) { - return appSchedulingInfo.removeIncreaseRequest(nodeId, priority, + SchedulerRequestKey schedulerKey, ContainerId containerId) { + return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey, containerId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 1f57e07..2efdbd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -308,11 +307,11 @@ public abstract class SchedulerNode { /** * Reserve container for the attempt on this node. * @param attempt Application attempt asking for the reservation. - * @param priority Priority of the reservation. + * @param schedulerKey Priority of the reservation. * @param container Container reserving resources for. */ public abstract void reserveResource(SchedulerApplicationAttempt attempt, - Priority priority, RMContainer container); + SchedulerRequestKey schedulerKey, RMContainer container); /** * Unreserve resources on this node. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java new file mode 100644 index 0000000..b4988be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; + +/** + * Composite key for outstanding scheduler requests for any schedulable entity. + * Currently it includes {@link Priority}. + */ +public final class SchedulerRequestKey implements + Comparable<SchedulerRequestKey> { + + private final Priority priority; + + public static final SchedulerRequestKey UNDEFINED = + new SchedulerRequestKey(Priority.UNDEFINED); + + /** + * Factory method to generate a SchedulerRequestKey from a ResourceRequest. + * @param req ResourceRequest + * @return SchedulerRequestKey + */ + public static SchedulerRequestKey create(ResourceRequest req) { + return new SchedulerRequestKey(req.getPriority()); + } + + /** + * Convenience method to extract the SchedulerRequestKey used to schedule the + * Container. + * @param container Container + * @return SchedulerRequestKey + */ + public static SchedulerRequestKey extractFrom(Container container) { + return new SchedulerRequestKey(container.getPriority()); + } + + private SchedulerRequestKey(Priority priority) { + this.priority = priority; + } + + /** + * Get the {@link Priority} of the request. + * + * @return the {@link Priority} of the request + */ + public Priority getPriority() { + return priority; + } + + @Override + public int compareTo(SchedulerRequestKey o) { + if (o == null) { + return (priority != null) ? -1 : 0; + } else { + if (priority == null) { + return 1; + } + } + return o.getPriority().compareTo(priority); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SchedulerRequestKey)) { + return false; + } + + SchedulerRequestKey that = (SchedulerRequestKey) o; + return getPriority().equals(that.getPriority()); + + } + + @Override + public int hashCode() { + return getPriority().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6dcafec..9aae909 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1265,7 +1265,8 @@ public class LeafQueue extends AbstractCSQueue { } if (null != priority) { - removed = app.unreserve(rmContainer.getContainer().getPriority(), node, + removed = app.unreserve( + rmContainer.getAllocatedSchedulerKey(), node, rmContainer); } @@ -1321,7 +1322,7 @@ public class LeafQueue extends AbstractCSQueue { // Remove container increase request if it exists application.removeIncreaseRequest(node.getNodeID(), - rmContainer.getAllocatedPriority(), rmContainer.getContainerId()); + rmContainer.getAllocatedSchedulerKey(), rmContainer.getContainerId()); boolean removed = false; @@ -1335,7 +1336,7 @@ public class LeafQueue extends AbstractCSQueue { // happen under scheduler's lock... // So, this is, in effect, a transaction across application & node if (rmContainer.getState() == RMContainerState.RESERVED) { - removed = application.unreserve(rmContainer.getReservedPriority(), + removed = application.unreserve(rmContainer.getReservedSchedulerKey(), node, rmContainer); } else { removed = @@ -1785,7 +1786,8 @@ public class LeafQueue extends AbstractCSQueue { // Do we have increase request for the same container? If so, remove it boolean hasIncreaseRequest = app.removeIncreaseRequest(decreaseRequest.getNodeId(), - decreaseRequest.getPriority(), decreaseRequest.getContainerId()); + decreaseRequest.getRMContainer().getAllocatedSchedulerKey(), + decreaseRequest.getContainerId()); if (hasIncreaseRequest) { if (LOG.isDebugEnabled()) { LOG.debug("While processing decrease requests, found an increase" http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aace38b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 25e5824..4a2ae18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -37,6 +36,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -115,7 +116,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { node.getUnallocatedResource())) { // OK, we can allocate this increase request // Unreserve it first - application.unreserve(increaseRequest.getPriority(), + application.unreserve( + increaseRequest.getRMContainer().getAllocatedSchedulerKey(), (FiCaSchedulerNode) node, increaseRequest.getRMContainer()); // Notify application @@ -152,7 +154,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { return createSuccessfullyIncreasedCSAssignment(increaseRequest, false); } else { boolean reservationSucceeded = - application.reserveIncreasedContainer(increaseRequest.getPriority(), + application.reserveIncreasedContainer( + increaseRequest.getRMContainer().getAllocatedSchedulerKey(), node, increaseRequest.getRMContainer(), increaseRequest.getDeltaCapacity()); @@ -228,11 +231,11 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { * priority, but will skip increase request and move to next increase * request if queue-limit or user-limit aren't satisfied */ - for (Priority priority : application.getPriorities()) { + for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { if (LOG.isDebugEnabled()) { LOG.debug("Looking at increase request for application=" + application.getApplicationAttemptId() + " priority=" - + priority); + + schedulerKey.getPriority()); } /* @@ -242,14 +245,14 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { * cannot be allocated. */ Map<ContainerId, SchedContainerChangeRequest> increaseRequestMap = - sinfo.getIncreaseRequests(nodeId, priority); + sinfo.getIncreaseRequests(nodeId, schedulerKey); // We don't have more increase request on this priority, skip.. if (null == increaseRequestMap) { if (LOG.isDebugEnabled()) { LOG.debug("There's no increase request for " + application.getApplicationAttemptId() + " priority=" - + priority); + + schedulerKey.getPriority()); } continue; } @@ -318,7 +321,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { // Remove invalid in request requests if (!toBeRemovedRequests.isEmpty()) { for (SchedContainerChangeRequest req : toBeRemovedRequests) { - sinfo.removeIncreaseRequest(req.getNodeId(), req.getPriority(), + sinfo.removeIncreaseRequest(req.getNodeId(), + req.getRMContainer().getAllocatedSchedulerKey(), req.getContainerId()); } } @@ -337,8 +341,9 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { // We already reserved this increase container SchedContainerChangeRequest request = - sinfo.getIncreaseRequest(nodeId, reservedContainer.getContainer() - .getPriority(), reservedContainer.getContainerId()); + sinfo.getIncreaseRequest(nodeId, + reservedContainer.getAllocatedSchedulerKey(), + reservedContainer.getContainerId()); // We will cancel the reservation any of following happens // - Container finished --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org