YARN-4511. Common scheduler changes to support scheduler-specific oversubscription implementations.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b64dc29c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b64dc29c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b64dc29c Branch: refs/heads/YARN-1011 Commit: b64dc29c35026db6d615e57286fc06f26b99fdc4 Parents: 5ef1fdc Author: Haibo Chen <haiboc...@apache.org> Authored: Thu Nov 2 09:12:19 2017 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Mon Jun 4 16:25:31 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 6 + .../yarn/sls/scheduler/RMNodeWrapper.java | 6 + .../resourcemanager/ResourceTrackerService.java | 3 +- .../monitor/capacity/TempSchedulerNode.java | 2 +- .../server/resourcemanager/rmnode/RMNode.java | 7 + .../resourcemanager/rmnode/RMNodeImpl.java | 13 +- .../scheduler/AbstractYarnScheduler.java | 4 +- .../scheduler/ClusterNodeTracker.java | 6 +- .../scheduler/SchedulerNode.java | 317 +++++++++++---- .../scheduler/SchedulerNodeReport.java | 4 +- .../scheduler/capacity/CapacityScheduler.java | 2 +- .../allocator/RegularContainerAllocator.java | 4 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 2 +- .../common/fica/FiCaSchedulerNode.java | 11 +- .../scheduler/fair/FSPreemptionThread.java | 2 +- .../scheduler/fair/FSSchedulerNode.java | 9 +- .../yarn/server/resourcemanager/MockNodes.java | 6 + .../TestWorkPreservingRMRestart.java | 39 +- ...alCapacityPreemptionPolicyMockFramework.java | 2 +- ...alCapacityPreemptionPolicyMockFramework.java | 6 +- .../scheduler/TestAbstractYarnScheduler.java | 4 +- .../scheduler/TestSchedulerNode.java | 393 +++++++++++++++++++ .../capacity/TestCapacityScheduler.java | 2 +- .../TestCapacitySchedulerAsyncScheduling.java | 8 +- .../scheduler/capacity/TestLeafQueue.java | 4 +- .../TestNodeLabelContainerAllocation.java | 14 +- .../fair/TestContinuousScheduling.java | 42 +- .../scheduler/fair/TestFSSchedulerNode.java | 18 +- .../scheduler/fair/TestFairScheduler.java | 14 +- .../scheduler/fifo/TestFifoScheduler.java | 4 +- 30 files changed, 787 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 0c99139..4b9800f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -200,6 +201,11 @@ public class NodeInfo { } @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + + @Override public long getUntrackedTimeStamp() { return 0; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 78645e9..a652ac8 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -188,6 +189,11 @@ public class RMNodeWrapper implements RMNode { } @Override + public OverAllocationInfo getOverAllocationInfo() { + return node.getOverAllocationInfo(); + } + + @Override public long getUntrackedTimeStamp() { return 0; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index e997192..00d1169 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -395,7 +395,8 @@ public class ResourceTrackerService extends AbstractService implements .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion, physicalResource); + resolve(host), capability, nodeManagerVersion, physicalResource, + request.getOverAllocationInfo()); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java index 320f262..25777af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java @@ -51,7 +51,7 @@ public class TempSchedulerNode { public static TempSchedulerNode fromSchedulerNode( FiCaSchedulerNode schedulerNode) { TempSchedulerNode n = new TempSchedulerNode(); - n.totalResource = Resources.clone(schedulerNode.getTotalResource()); + n.totalResource = Resources.clone(schedulerNode.getCapacity()); n.allocatedResource = Resources.clone(schedulerNode.getAllocatedResource()); n.runningContainers = schedulerNode.getCopiedListOfRunningContainers(); n.reservedContainer = schedulerNode.getReservedContainer(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 872f2a6..a858b47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; /** * Node managers information on available resources @@ -115,6 +116,12 @@ public interface RMNode { public ResourceUtilization getNodeUtilization(); /** + * Get the node overallocation threshold. + * @return the overallocation threshold + */ + OverAllocationInfo getOverAllocationInfo(); + + /** * the physical resources in the node. * @return the physical resources in the node. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b942afa..5a7c008 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; @@ -109,6 +110,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { private final WriteLock writeLock; private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue; + private final OverAllocationInfo overallocationInfo; private volatile boolean nextHeartBeat = true; private final NodeId nodeId; @@ -364,12 +366,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { this(nodeId, context, hostName, cmPort, httpPort, node, capability, - nodeManagerVersion, null); + nodeManagerVersion, null, null); } public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, - String nodeManagerVersion, Resource physResource) { + String nodeManagerVersion, Resource physResource, + OverAllocationInfo overAllocationInfo) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -384,6 +387,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { this.nodeManagerVersion = nodeManagerVersion; this.timeStamp = 0; this.physicalResource = physResource; + this.overallocationInfo = overAllocationInfo; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -533,6 +537,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } } + @Override + public OverAllocationInfo getOverAllocationInfo() { + return this.overallocationInfo; + } + public void setNodeUtilization(ResourceUtilization nodeUtilization) { this.writeLock.lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d2e81a5..1505958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -357,7 +357,7 @@ public abstract class AbstractYarnScheduler } application.containerLaunchedOnNode(containerId, node.getNodeID()); - node.containerStarted(containerId); + node.containerLaunched(containerId); } finally { readLock.unlock(); } @@ -825,7 +825,7 @@ public abstract class AbstractYarnScheduler writeLock.lock(); SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); - Resource oldResource = node.getTotalResource(); + Resource oldResource = node.getCapacity(); if (!oldResource.equals(newResource)) { // Notify NodeLabelsManager about this change rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 66d8810..3030373 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -90,7 +90,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> { nodesList.add(node); // Update cluster capacity - Resources.addTo(clusterCapacity, node.getTotalResource()); + Resources.addTo(clusterCapacity, node.getCapacity()); staleClusterCapacity = Resources.clone(clusterCapacity); // Update maximumAllocation @@ -175,7 +175,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> { } // Update cluster capacity - Resources.subtractFrom(clusterCapacity, node.getTotalResource()); + Resources.subtractFrom(clusterCapacity, node.getCapacity()); staleClusterCapacity = Resources.clone(clusterCapacity); // Update maximumAllocation @@ -237,7 +237,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> { } private void updateMaxResources(SchedulerNode node, boolean add) { - Resource totalResource = node.getTotalResource(); + Resource totalResource = node.getCapacity(); ResourceInformation[] totalResources; if (totalResource != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index d5bfc57..5c8efbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -58,23 +59,32 @@ public abstract class SchedulerNode { private static final Log LOG = LogFactory.getLog(SchedulerNode.class); + private Resource capacity; private Resource unallocatedResource = Resource.newInstance(0, 0); - private Resource allocatedResource = Resource.newInstance(0, 0); - private Resource totalResource; + private RMContainer reservedContainer; - private volatile int numContainers; private volatile ResourceUtilization containersUtilization = ResourceUtilization.newInstance(0, 0, 0f); private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); - /* set of containers that are allocated containers */ - private final Map<ContainerId, ContainerInfo> launchedContainers = - new HashMap<>(); + private final Map<ContainerId, ContainerInfo> + allocatedContainers = new HashMap<>(); + + private volatile int numGuaranteedContainers = 0; + private Resource allocatedResourceGuaranteed = Resource.newInstance(0, 0); + + private volatile int numOpportunisticContainers = 0; + private Resource allocatedResourceOpportunistic = Resource.newInstance(0, 0); private final RMNode rmNode; private final String nodeName; + // The total amount of resources requested by containers that have been + // allocated but not yet launched on the node. + protected Resource resourceAllocatedPendingLaunch = + Resource.newInstance(0, 0); + private volatile Set<String> labels = null; // Last updated time @@ -84,7 +94,7 @@ public abstract class SchedulerNode { Set<String> labels) { this.rmNode = node; this.unallocatedResource = Resources.clone(node.getTotalCapability()); - this.totalResource = Resources.clone(node.getTotalCapability()); + this.capacity = Resources.clone(node.getTotalCapability()); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -107,9 +117,9 @@ public abstract class SchedulerNode { * @param resource Total resources on the node. */ public synchronized void updateTotalResource(Resource resource){ - this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); + this.capacity = Resources.clone(resource); + this.unallocatedResource = Resources.subtract(capacity, + this.allocatedResourceGuaranteed); } /** @@ -168,17 +178,83 @@ public abstract class SchedulerNode { protected synchronized void allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { Container container = rmContainer.getContainer(); - if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { - deductUnallocatedResource(container.getResource()); - ++numContainers; + + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceAllocated(rmContainer, launchedOnNode); + } else { + opportunisticContainerResourceAllocated(rmContainer, launchedOnNode); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " and type " + + container.getExecutionType() + " on host " + toString()); + } + } + + /** + * Handle an allocation of a GUARANTEED container. + * @param rmContainer the allocated GUARANTEED container + * @param launchedOnNode true if the container has been launched + */ + private void guaranteedContainerResourceAllocated( + RMContainer rmContainer, boolean launchedOnNode) { + Container container = rmContainer.getContainer(); + + if (container.getExecutionType() != ExecutionType.GUARANTEED) { + throw new YarnRuntimeException("Inapplicable ExecutionType: " + + container.getExecutionType()); } - launchedContainers.put(container.getId(), + allocatedContainers.put(container.getId(), new ContainerInfo(rmContainer, launchedOnNode)); + + Resource resource = container.getResource(); + if (containerResourceAllocated(resource, allocatedResourceGuaranteed)) { + Resources.subtractFrom(unallocatedResource, resource); + } + + numGuaranteedContainers++; } /** - * Get unallocated resources on the node. + * Handle an allocation of a OPPORTUNISTIC container. + * @param rmContainer the allocated OPPORTUNISTIC container + * @param launchedOnNode true if the container has been launched + */ + private void opportunisticContainerResourceAllocated( + RMContainer rmContainer, boolean launchedOnNode) { + Container container = rmContainer.getContainer(); + + if (container.getExecutionType() != ExecutionType.OPPORTUNISTIC) { + throw new YarnRuntimeException("Inapplicable ExecutionType: " + + container.getExecutionType()); + } + + allocatedContainers.put(rmContainer.getContainerId(), + new ContainerInfo(rmContainer, launchedOnNode)); + if (containerResourceAllocated( + container.getResource(), allocatedResourceOpportunistic)) { + // nothing to do here + } + numOpportunisticContainers++; + } + + private boolean containerResourceAllocated(Resource allocated, + Resource aggregatedResources) { + if (allocated == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return false; + } + Resources.addTo(resourceAllocatedPendingLaunch, allocated); + Resources.addTo(aggregatedResources, allocated); + return true; + } + + + /** + * Get resources that are not allocated to GUARANTEED containers on the node. * @return Unallocated resources on the node */ public synchronized Resource getUnallocatedResource() { @@ -186,42 +262,57 @@ public abstract class SchedulerNode { } /** - * Get allocated resources on the node. - * @return Allocated resources on the node + * Get resources allocated to GUARANTEED containers on the node. + * @return Allocated resources to GUARANTEED containers on the node */ public synchronized Resource getAllocatedResource() { - return this.allocatedResource; + return this.allocatedResourceGuaranteed; + } + + /** + * Get resources allocated to OPPORTUNISTIC containers on the node. + * @return Allocated resources to OPPORTUNISTIC containers on the node + */ + public synchronized Resource getOpportunisticResourceAllocated() { + return this.allocatedResourceOpportunistic; + } + + @VisibleForTesting + public synchronized Resource getResourceAllocatedPendingLaunch() { + return this.resourceAllocatedPendingLaunch; } /** * Get total resources on the node. * @return Total resources on the node. */ - public synchronized Resource getTotalResource() { - return this.totalResource; + public synchronized Resource getCapacity() { + return this.capacity; } /** - * Check if a container is launched by this node. + * Check if a GUARANTEED container is launched by this node. * @return If the container is launched by the node. */ - public synchronized boolean isValidContainer(ContainerId containerId) { - if (launchedContainers.containsKey(containerId)) { - return true; - } - return false; + @VisibleForTesting + public synchronized boolean isValidGuaranteedContainer( + ContainerId containerId) { + ContainerInfo containerInfo = allocatedContainers.get(containerId); + return containerInfo != null && ExecutionType.GUARANTEED == + containerInfo.container.getExecutionType(); } /** - * Update the resources of the node when releasing a container. - * @param container Container to release. + * Check if an OPPORTUNISTIC container is launched by this node. + * @param containerId id of the container to check + * @return If the container is launched by the node. */ - protected synchronized void updateResourceForReleasedContainer( - Container container) { - if (container.getExecutionType() == ExecutionType.GUARANTEED) { - addUnallocatedResource(container.getResource()); - --numContainers; - } + @VisibleForTesting + public synchronized boolean isValidOpportunisticContainer( + ContainerId containerId) { + ContainerInfo containerInfo = allocatedContainers.get(containerId); + return containerInfo != null && ExecutionType.OPPORTUNISTIC == + containerInfo.container.getExecutionType(); } /** @@ -231,25 +322,34 @@ public abstract class SchedulerNode { */ public synchronized void releaseContainer(ContainerId containerId, boolean releasedByNode) { - ContainerInfo info = launchedContainers.get(containerId); - if (info == null) { + RMContainer rmContainer = getContainer(containerId); + if (rmContainer == null) { + LOG.warn("Invalid container " + containerId + " is released."); return; } - if (!releasedByNode && info.launchedOnNode) { - // wait until node reports container has completed + + if (!allocatedContainers.containsKey(containerId)) { + // do not process if the container is never allocated on the node + return; + } + if (!releasedByNode && + allocatedContainers.get(containerId).launchedOnNode) { + // only process if the container has not been launched on a node + // yet or it is released by node. return; } - launchedContainers.remove(containerId); - Container container = info.container.getContainer(); - updateResourceForReleasedContainer(container); + Container container = rmContainer.getContainer(); + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerReleased(container); + } else { + opportunisticContainerReleased(container); + } if (LOG.isDebugEnabled()) { - LOG.debug("Released container " + container.getId() + " of capacity " - + container.getResource() + " on host " + rmNode.getNodeAddress() - + ", which currently has " + numContainers + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available" + ", release resources=" + true); + LOG.debug("Released " + container.getExecutionType() + " container " + + containerId + " of " + "capacity " + container.getResource() + + " on node (" + toString() + ")" + ", release resources=" + true); } } @@ -257,42 +357,75 @@ public abstract class SchedulerNode { * Inform the node that a container has launched. * @param containerId ID of the launched container */ - public synchronized void containerStarted(ContainerId containerId) { - ContainerInfo info = launchedContainers.get(containerId); - if (info != null) { + public synchronized void containerLaunched(ContainerId containerId) { + ContainerInfo info = allocatedContainers.get(containerId); + if (info != null && !info.launchedOnNode) { info.launchedOnNode = true; + Resources.subtractFrom(resourceAllocatedPendingLaunch, + info.container.getContainer().getResource()); } } /** - * Add unallocated resources to the node. This is used when unallocating a - * container. - * @param resource Resources to add. + * Handle the release of a GUARANTEED container. + * @param container Container to release. */ - private synchronized void addUnallocatedResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; + protected synchronized void guaranteedContainerReleased( + Container container) { + if (container.getExecutionType() != ExecutionType.GUARANTEED) { + throw new YarnRuntimeException("Inapplicable ExecutionType: " + + container.getExecutionType()); + } + + if (containerResourceReleased(container, allocatedResourceGuaranteed)) { + Resources.addTo(unallocatedResource, container.getResource()); } - Resources.addTo(unallocatedResource, resource); - Resources.subtractFrom(allocatedResource, resource); + // do not update allocated containers until the resources of + // the container are released because we need to check if we + // need to update resourceAllocatedPendingLaunch in case the + // container has not been launched on the node. + allocatedContainers.remove(container.getId()); + numGuaranteedContainers--; } /** - * Deduct unallocated resources from the node. This is used when allocating a - * container. - * @param resource Resources to deduct. + * Handle the release of an OPPORTUNISTIC container. + * @param container Container to release. */ - @VisibleForTesting - public synchronized void deductUnallocatedResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " + private void opportunisticContainerReleased( + Container container) { + if (container.getExecutionType() != ExecutionType.OPPORTUNISTIC) { + throw new YarnRuntimeException("Inapplicable ExecutionType: " + + container.getExecutionType()); + } + + if (containerResourceReleased(container, allocatedResourceOpportunistic)) { + // nothing to do here + } + // do not update allocated containers until the resources of + // the container are released because we need to check if we + // need to update resourceAllocatedPendingLaunch in case the + // container has not been launched on the node. + allocatedContainers.remove(container.getId()); + numOpportunisticContainers--; + } + + private boolean containerResourceReleased(Container container, + Resource aggregatedResource) { + Resource released = container.getResource(); + if (released == null) { + LOG.error("Invalid resource addition of null resource for " + rmNode.getNodeAddress()); - return; + return false; } - Resources.subtractFrom(unallocatedResource, resource); - Resources.addTo(allocatedResource, resource); + Resources.subtractFrom(aggregatedResource, released); + + if (!allocatedContainers.get(container.getId()).launchedOnNode) { + // update resourceAllocatedPendingLaunch if the container is has + // not yet been launched on the node + Resources.subtractFrom(resourceAllocatedPendingLaunch, released); + } + return true; } /** @@ -312,17 +445,28 @@ public abstract class SchedulerNode { @Override public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" - + getNumContainers() + " available=" + getUnallocatedResource() - + " used=" + getAllocatedResource(); + return "host: " + rmNode.getNodeAddress() + " #guaranteed containers=" + + getNumGuaranteedContainers() + " #opportunistic containers=" + + getNumOpportunisticContainers() + " available=" + + getUnallocatedResource() + " used by guaranteed containers=" + + allocatedResourceGuaranteed + " used by opportunistic containers=" + + allocatedResourceOpportunistic; + } + + /** + * Get number of active GUARANTEED containers on the node. + * @return Number of active OPPORTUNISTIC containers on the node. + */ + public int getNumGuaranteedContainers() { + return numGuaranteedContainers; } /** - * Get number of active containers on the node. - * @return Number of active containers on the node. + * Get number of active OPPORTUNISTIC containers on the node. + * @return Number of active OPPORTUNISTIC containers on the node. */ - public int getNumContainers() { - return numContainers; + public int getNumOpportunisticContainers() { + return numOpportunisticContainers; } /** @@ -330,8 +474,8 @@ public abstract class SchedulerNode { * @return A copy of containers running on the node. */ public synchronized List<RMContainer> getCopiedListOfRunningContainers() { - List<RMContainer> result = new ArrayList<>(launchedContainers.size()); - for (ContainerInfo info : launchedContainers.values()) { + List<RMContainer> result = new ArrayList<>(allocatedContainers.size()); + for (ContainerInfo info : allocatedContainers.values()) { result.add(info.container); } return result; @@ -341,12 +485,14 @@ public abstract class SchedulerNode { * Get the containers running on the node with AM containers at the end. * @return A copy of running containers with AM containers at the end. */ - public synchronized List<RMContainer> getRunningContainersWithAMsAtTheEnd() { + public synchronized List<RMContainer> + getRunningGuaranteedContainersWithAMsAtTheEnd() { LinkedList<RMContainer> result = new LinkedList<>(); - for (ContainerInfo info : launchedContainers.values()) { + for (ContainerInfo info : allocatedContainers.values()) { if(info.container.isAMContainer()) { result.addLast(info.container); - } else { + } else if (info.container.getExecutionType() == + ExecutionType.GUARANTEED){ result.addFirst(info.container); } } @@ -359,12 +505,9 @@ public abstract class SchedulerNode { * @return The container for the specified container ID */ protected synchronized RMContainer getContainer(ContainerId containerId) { - RMContainer container = null; - ContainerInfo info = launchedContainers.get(containerId); - if (info != null) { - container = info.container; - } - return container; + ContainerInfo info = allocatedContainers.get(containerId); + + return info != null ? info.container : null; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java index fa71a25..ea30d78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java @@ -31,11 +31,11 @@ public class SchedulerNodeReport { private final Resource used; private final Resource avail; private final int num; - + public SchedulerNodeReport(SchedulerNode node) { this.used = node.getAllocatedResource(); this.avail = node.getUnallocatedResource(); - this.num = node.getNumContainers(); + this.num = node.getNumGuaranteedContainers(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1c9bf6b..1f8fa4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1868,7 +1868,7 @@ public class CapacityScheduler extends // update this node to node label manager if (labelManager != null) { labelManager.activateNode(nodeManager.getNodeID(), - schedulerNode.getTotalResource()); + schedulerNode.getCapacity()); } Resource clusterResource = getClusterResource(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 99deb1a..a7d8a21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -515,13 +515,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { Resource capability = pendingAsk.getPerAllocationResource(); Resource available = node.getUnallocatedResource(); - Resource totalResource = node.getTotalResource(); + Resource totalResource = node.getCapacity(); if (!Resources.lessThanOrEqual(rc, clusterResource, capability, totalResource)) { LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for ask : " + pendingAsk - + " node total capability : " + node.getTotalResource()); + + " node total capability : " + node.getCapacity()); // Skip this locality request ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 3ec8191..a9ee44e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -1017,7 +1017,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { diagnosticMessageBldr.append(" ( Partition : "); diagnosticMessageBldr.append(node.getLabels()); diagnosticMessageBldr.append(", Total resource : "); - diagnosticMessageBldr.append(node.getTotalResource()); + diagnosticMessageBldr.append(node.getCapacity()); diagnosticMessageBldr.append(", Available resource : "); diagnosticMessageBldr.append(node.getUnallocatedResource()); diagnosticMessageBldr.append(" )."); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 7277779..52165a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -144,9 +144,9 @@ public class FiCaSchedulerNode extends SchedulerNode { } @Override - protected synchronized void updateResourceForReleasedContainer( + protected synchronized void guaranteedContainerReleased( Container container) { - super.updateResourceForReleasedContainer(container); + super.guaranteedContainerReleased(container); if (killableContainers.containsKey(container.getId())) { Resources.subtractFrom(totalKillableResources, container.getResource()); killableContainers.remove(container.getId()); @@ -168,9 +168,10 @@ public class FiCaSchedulerNode extends SchedulerNode { final Container container = rmContainer.getContainer(); LOG.info("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + getRMNode().getNodeAddress() - + ", which has " + getNumContainers() + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available after allocation"); + + ", which has " + getNumGuaranteedContainers() + " guaranteed" + + " containers using " + getAllocatedResource() + ", " + + getNumOpportunisticContainers() + " opportunistic containers" + + " using " + getOpportunisticResourceAllocated()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index c32565f..dcb076f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -193,7 +193,7 @@ class FSPreemptionThread extends Thread { // Figure out list of containers to consider List<RMContainer> containersToCheck = - node.getRunningContainersWithAMsAtTheEnd(); + node.getRunningGuaranteedContainersWithAMsAtTheEnd(); containersToCheck.removeAll(node.getContainersForPreemption()); // Initialize potential with unallocated but not reserved resources http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 44ec9c3..95490f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -242,11 +242,12 @@ public class FSSchedulerNode extends SchedulerNode { super.allocateContainer(rmContainer, launchedOnNode); if (LOG.isDebugEnabled()) { final Container container = rmContainer.getContainer(); - LOG.debug("Assigned container " + container.getId() + " of capacity " + LOG.info("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + getRMNode().getNodeAddress() - + ", which has " + getNumContainers() + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available after allocation"); + + ", which has " + getNumGuaranteedContainers() + " guaranteed " + + "containers using " + getAllocatedResource() + ", " + + getNumOpportunisticContainers() + " opportunistic containers " + + "using " + getOpportunisticResourceAllocated()); } Resource allocated = rmContainer.getAllocatedResource(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 84105d9..522d4f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -262,6 +263,11 @@ public class MockNodes { return this.nodeUtilization; } + @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + public OpportunisticContainersStatus getOpportunisticContainersStatus() { return OpportunisticContainersStatus.newInstance(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index e4c83e3..5b49303 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -226,13 +226,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); - assertTrue(schedulerNode1.isValidContainer(runningContainer - .getContainerId())); - assertFalse(schedulerNode1.isValidContainer(completedContainer - .getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + runningContainer.getContainerId())); + assertFalse(schedulerNode1.isValidGuaranteedContainer( + completedContainer.getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); @@ -383,13 +384,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); - assertTrue( - schedulerNode1.isValidContainer(runningContainer.getContainerId())); - assertFalse( - schedulerNode1.isValidContainer(completedContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + runningContainer.getContainerId())); + assertFalse(schedulerNode1.isValidGuaranteedContainer( + completedContainer.getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); @@ -1650,13 +1652,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); - assertTrue( - schedulerNode1.isValidContainer(runningContainer.getContainerId())); - assertFalse( - schedulerNode1.isValidContainer(completedContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + runningContainer.getContainerId())); + assertFalse(schedulerNode1.isValidGuaranteedContainer( + completedContainer.getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 64b56fb..faf8ccd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -514,7 +514,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { totalRes = parseResourceFromString(resSring); } } - when(sn.getTotalResource()).thenReturn(totalRes); + when(sn.getCapacity()).thenReturn(totalRes); when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes)); // TODO, add settings of killable resources when necessary http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java index 964a230..89fb846 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java @@ -237,17 +237,17 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework // Check host resources Assert.assertEquals(3, this.cs.getAllNodes().size()); SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1)); - Assert.assertEquals(100, node1.getTotalResource().getMemorySize()); + Assert.assertEquals(100, node1.getCapacity().getMemorySize()); Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size()); Assert.assertNull(node1.getReservedContainer()); SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1)); - Assert.assertEquals(0, node2.getTotalResource().getMemorySize()); + Assert.assertEquals(0, node2.getCapacity().getMemorySize()); Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size()); Assert.assertNotNull(node2.getReservedContainer()); SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1)); - Assert.assertEquals(30, node3.getTotalResource().getMemorySize()); + Assert.assertEquals(30, node3.getCapacity().getMemorySize()); Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size()); Assert.assertNull(node3.getReservedContainer()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index c0f8d39..3113a16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -282,12 +282,12 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { SchedulerNode mockNode1 = mock(SchedulerNode.class); when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080)); when(mockNode1.getUnallocatedResource()).thenReturn(emptyResource); - when(mockNode1.getTotalResource()).thenReturn(fullResource1); + when(mockNode1.getCapacity()).thenReturn(fullResource1); SchedulerNode mockNode2 = mock(SchedulerNode.class); when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081)); when(mockNode2.getUnallocatedResource()).thenReturn(emptyResource); - when(mockNode2.getTotalResource()).thenReturn(fullResource2); + when(mockNode2.getCapacity()).thenReturn(fullResource2); verifyMaximumResourceCapability(configuredMaximumResource, scheduler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java new file mode 100644 index 0000000..b04b277 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for SchedulerNode. + */ +public class TestSchedulerNode { + private final Resource nodeCapacity = Resource.newInstance(1024*10, 4); + + @Test + public void testAllocateAndReleaseGuaranteedContainer() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(containerId)); + + // launch the container on the node + schedulerNode.containerLaunched(containerId); + + Assert.assertEquals("The container should have been launched", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, true); + Assert.assertEquals("The container should have been released", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + } + + @Test + public void testAllocateAndReleaseOpportunisticContainer() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertEquals("Incorrect remaining resource accounted.", + nodeCapacity, schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumOpportunisticContainers()); + Assert.assertTrue( + schedulerNode.isValidOpportunisticContainer(containerId)); + + // launch the container on the node + schedulerNode.containerLaunched(containerId); + + Assert.assertEquals("The container should have been launched", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, true); + Assert.assertEquals("The container should have been released", + 0, schedulerNode.getNumOpportunisticContainers()); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertFalse("The container should have been released", + schedulerNode.isValidOpportunisticContainer(containerId)); + } + + @Test + public void testAllocateAndReleaseContainers() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + + Resource guaranteedResource = Resource.newInstance(4096, 1); + RMContainer guaranteedContainer = + createRMContainer(0, guaranteedResource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId guaranteedContainerId = guaranteedContainer.getContainerId(); + + // allocate a guaranteed container on the node + schedulerNode.allocateContainer(guaranteedContainer); + + Assert.assertEquals("The guaranteed container should have been allocated", + guaranteedResource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, guaranteedResource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The guaranteed container should have been allocated" + + " but not launched", guaranteedResource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(guaranteedContainerId)); + + Resource opportunisticResource = Resource.newInstance(8192, 4); + RMContainer opportunisticContainer = + createRMContainer(1, opportunisticResource, + ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode()); + ContainerId opportunisticContainerId = + opportunisticContainer.getContainerId(); + + // allocate an opportunistic container on the node + schedulerNode.allocateContainer(opportunisticContainer); + + Assert.assertEquals("The opportunistic container should have been" + + " allocated", opportunisticResource, + schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, guaranteedResource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The opportunistic container should also have been" + + " allocated but not launched", + Resources.add(guaranteedResource, opportunisticResource), + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumOpportunisticContainers()); + Assert.assertTrue( + schedulerNode.isValidOpportunisticContainer(opportunisticContainerId)); + + // launch both containers on the node + schedulerNode.containerLaunched(guaranteedContainerId); + schedulerNode.containerLaunched(opportunisticContainerId); + + Assert.assertEquals("Both containers should have been launched", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + + // release both containers + schedulerNode.releaseContainer(guaranteedContainerId, true); + schedulerNode.releaseContainer(opportunisticContainerId, true); + + Assert.assertEquals("The guaranteed container should have been released", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The opportunistic container should have been released", + 0, schedulerNode.getNumOpportunisticContainers()); + Assert.assertEquals("The guaranteed container should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + Assert.assertEquals("The opportunistic container should have been released", + Resources.none(), schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertFalse("The guaranteed container should have been released", + schedulerNode.isValidGuaranteedContainer(guaranteedContainerId)); + Assert.assertFalse("The opportunistic container should have been released", + schedulerNode.isValidOpportunisticContainer(opportunisticContainerId)); + } + + @Test + public void testReleaseLaunchedContainerNotAsNode() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(containerId)); + + // launch the container on the node + schedulerNode.containerLaunched(containerId); + + Assert.assertEquals("The container should have been launched", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, false); + Assert.assertEquals("The container should not have been released", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The container should not have been released", + resource, schedulerNode.getAllocatedResource()); + Assert.assertTrue("The container should not have been released", + schedulerNode.isValidGuaranteedContainer(containerId)); + } + + @Test + public void testReleaseUnlaunchedContainerAsNode() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", + resource, schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(containerId)); + + // make sure the container is not launched yet + Assert.assertEquals("The container should not be launched already", + resource, schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, true); + Assert.assertEquals("The container should have been released", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + Assert.assertFalse("The container should have been released", + schedulerNode.isValidGuaranteedContainer(containerId)); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + } + + @Test + public void testReleaseUnlaunchedContainerNotAsNode() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(containerId)); + + // make sure the container is not launched yet + Assert.assertEquals("The container should not have been launched", + resource, schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, false); + Assert.assertEquals("The container should have been released", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + Assert.assertFalse("The container should have been released", + schedulerNode.isValidGuaranteedContainer(containerId)); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + } + + private SchedulerNode createSchedulerNode(Resource capacity) { + NodeId nodeId = NodeId.newInstance("localhost", 0); + + RMNode rmNode = mock(RMNode.class); + when(rmNode.getNodeID()).thenReturn(nodeId); + when(rmNode.getHostName()).thenReturn(nodeId.getHost()); + when(rmNode.getTotalCapability()).thenReturn(capacity); + when(rmNode.getRackName()).thenReturn("/default"); + when(rmNode.getHttpAddress()).thenReturn(nodeId.getHost()); + when(rmNode.getNodeAddress()).thenReturn(nodeId.getHost()); + + return new SchedulerNodeForTest(rmNode); + } + + private static RMContainerImpl createRMContainer(long containerId, + Resource resource, ExecutionType executionType, RMNode node) { + Container container = + createContainer(containerId, resource, executionType, node); + + Dispatcher dispatcher = new AsyncDispatcher(); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(dispatcher); + when(rmContext.getSystemMetricsPublisher()). + thenReturn(new NoOpSystemMetricPublisher()); + when(rmContext.getYarnConfiguration()). + thenReturn(new YarnConfiguration()); + when(rmContext.getContainerAllocationExpirer()). + thenReturn(new ContainerAllocationExpirer(dispatcher)); + when(rmContext.getRMApplicationHistoryWriter()). + thenReturn(new RMApplicationHistoryWriter()); + + return new RMContainerImpl(container, null, + container.getId().getApplicationAttemptId(), + node.getNodeID(), "test", rmContext); + } + + private static Container createContainer(long containerId, Resource resource, + ExecutionType executionType, RMNode node) { + ApplicationAttemptId appAttemptId = ApplicationAttemptId. + newInstance(ApplicationId.newInstance(0, 0), 0); + ContainerId cId = + ContainerId.newContainerId(appAttemptId, containerId); + Container container = Container.newInstance( + cId, node.getNodeID(), node.getNodeAddress(), resource, + Priority.newInstance(0), null, executionType); + return container; + } + + + /** + * A test implementation of SchedulerNode for the purpose of testing + * SchedulerNode only. Resource reservation is scheduler-dependent, + * and therefore not covered here. + */ + private static final class SchedulerNodeForTest extends SchedulerNode { + SchedulerNodeForTest(RMNode node) { + super(node, false); + } + + @Override + public void reserveResource(SchedulerApplicationAttempt attempt, + SchedulerRequestKey schedulerKey, RMContainer container) { + } + + @Override + public void unreserveResource(SchedulerApplicationAttempt attempt) { + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 0b54010..e07dbd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -4132,7 +4132,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { // Check total resource of scheduler node is also changed to 1 GB 1 core Resource totalResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + .getSchedulerNode(nm_0.getNodeId()).getCapacity(); Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB, totalResource.getMemorySize()); Assert.assertEquals("Total Resource Virtual Cores should be 1", 1, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 338b9f9..0fa3970 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -223,8 +223,8 @@ public class TestCapacitySchedulerAsyncScheduling { // nm1 runs 1 container(app1-container_01/AM) // nm2 runs 1 container(app1-container_02) - Assert.assertEquals(1, sn1.getNumContainers()); - Assert.assertEquals(1, sn2.getNumContainers()); + Assert.assertEquals(1, sn1.getNumGuaranteedContainers()); + Assert.assertEquals(1, sn2.getNumGuaranteedContainers()); // kill app attempt1 scheduler.handle( @@ -319,8 +319,8 @@ public class TestCapacitySchedulerAsyncScheduling { // nm1 runs 3 containers(app1-container_01/AM, app1-container_02, // app2-container_01/AM) // nm2 runs 1 container(app1-container_03) - Assert.assertEquals(3, sn1.getNumContainers()); - Assert.assertEquals(1, sn2.getNumContainers()); + Assert.assertEquals(3, sn1.getNumGuaranteedContainers()); + Assert.assertEquals(1, sn2.getNumGuaranteedContainers()); // reserve 1 container(app1-container_04) for app1 on nm1 ResourceRequest rr2 = ResourceRequest http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64dc29c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 04bb791..6215ce8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -385,7 +385,7 @@ public class TestLeafQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals( - (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB), + (int)(node_0.getCapacity().getMemorySize() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); } @@ -684,7 +684,7 @@ public class TestLeafQueue { assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, a.getMetrics().getReservedMB()); assertEquals(0*GB, a.getMetrics().getAllocatedMB()); - assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()), + assertEquals((int)(a.getCapacity() * node_0.getCapacity().getMemorySize()), a.getMetrics().getAvailableMB()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org