Revert "Revert "YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen.""
This reverts commit 1235cf5deb9f309e975c4f59ee27caf1282f19f9. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c6acfc44 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c6acfc44 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c6acfc44 Branch: refs/heads/YARN-1011 Commit: c6acfc44df00abf5899d202c40299f7fa37b3a15 Parents: ed33756 Author: Haibo Chen <haiboc...@apache.org> Authored: Mon Jun 11 13:34:40 2018 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Mon Jun 11 13:34:40 2018 -0700 ---------------------------------------------------------------------- .../nodemanager/NodeStatusUpdaterImpl.java | 2 +- .../containermanager/ContainerManagerImpl.java | 8 +- .../launcher/ContainerLaunch.java | 2 +- .../launcher/ContainersLauncher.java | 9 +- .../monitor/ContainersMonitor.java | 38 +- .../monitor/ContainersMonitorImpl.java | 56 +- .../AllocationBasedResourceTracker.java | 114 ++ ...locationBasedResourceUtilizationTracker.java | 158 --- .../scheduler/ContainerScheduler.java | 317 +++-- .../scheduler/ContainerSchedulerEventType.java | 4 +- .../scheduler/NMAllocationPolicy.java | 63 + .../scheduler/ResourceUtilizationTracker.java | 17 +- .../SnapshotBasedOverAllocationPolicy.java | 54 + .../UtilizationBasedResourceTracker.java | 95 ++ .../BaseContainerManagerTest.java | 35 + .../TestContainersMonitorResourceChange.java | 9 +- .../TestAllocationBasedResourceTracker.java | 82 ++ ...locationBasedResourceUtilizationTracker.java | 93 -- .../TestContainerSchedulerRecovery.java | 58 +- ...estContainerSchedulerWithOverAllocation.java | 1121 ++++++++++++++++++ 20 files changed, 1916 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 44f9740..a2f70d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -532,7 +532,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private ResourceUtilization getContainersUtilization() { ContainersMonitor containersMonitor = this.context.getContainerManager().getContainersMonitor(); - return containersMonitor.getContainersUtilization(); + return containersMonitor.getContainersUtilization(false).getUtilization(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3470910..77ea4fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -243,6 +243,12 @@ public class ContainerManagerImpl extends CompositeService implements metrics); addService(rsrcLocalizationSrvc); + this.containersMonitor = createContainersMonitor(exec); + addService(this.containersMonitor); + + // ContainersLauncher must be added after ContainersMonitor + // because the former depends on the latter to initialize + // over-allocation first. containersLauncher = createContainersLauncher(context, exec); addService(containersLauncher); @@ -267,8 +273,6 @@ public class ContainerManagerImpl extends CompositeService implements nmMetricsPublisher = createNMTimelinePublisher(context); context.setNMTimelinePublisher(nmMetricsPublisher); } - this.containersMonitor = createContainersMonitor(exec); - addService(this.containersMonitor); dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 57abfc3..269e319 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -1095,7 +1095,7 @@ public class ContainerLaunch implements Callable<Integer> { * @return Process ID * @throws Exception */ - private String getContainerPid(Path pidFilePath) throws Exception { + protected String getContainerPid(Path pidFilePath) throws Exception { String containerIdStr = container.getContainerId().toString(); String processId = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index cfd5d6a..c3d0a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -116,8 +116,7 @@ public class ContainersLauncher extends AbstractService containerId.getApplicationAttemptId().getApplicationId()); ContainerLaunch launch = - new ContainerLaunch(context, getConfig(), dispatcher, exec, app, - event.getContainer(), dirsHandler, containerManager); + createContainerLaunch(app, event.getContainer()); containerLauncher.submit(launch); running.put(containerId, launch); break; @@ -213,4 +212,10 @@ public class ContainersLauncher extends AbstractService break; } } + + protected ContainerLaunch createContainerLaunch( + Application app, Container container) { + return new ContainerLaunch(context, getConfig(), dispatcher, + exec, app, container, dirsHandler, containerManager); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 64831e9..8da4ec4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -23,10 +23,24 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; public interface ContainersMonitor extends Service, EventHandler<ContainersMonitorEvent>, ResourceView { - ResourceUtilization getContainersUtilization(); + + /** + * Get the aggregate resource utilization of containers running on the node, + * with a timestamp of the measurement. + * @param latest true if the latest result should be returned + * @return ResourceUtilization resource utilization of all containers + */ + ContainersResourceUtilization getContainersUtilization(boolean latest); + + /** + * Get the policy to over-allocate containers when over-allocation is on. + * @return null if over-allocation is turned off + */ + NMAllocationPolicy getContainerOverAllocationPolicy(); float getVmemRatio(); @@ -66,4 +80,26 @@ public interface ContainersMonitor extends Service, * containersMonitor.getVmemRatio()); resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores); } + + /** + * A snapshot of resource utilization of all containers with the timestamp. + */ + final class ContainersResourceUtilization { + private final ResourceUtilization utilization; + private final long timestamp; + + public ContainersResourceUtilization( + ResourceUtilization utilization, long timestamp) { + this.utilization = utilization; + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public ResourceUtilization getUtilization() { + return utilization; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index a133117..8e9bad8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -24,6 +24,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,8 +114,9 @@ public class ContainersMonitorImpl extends AbstractService implements CPU, MEMORY } - private ResourceUtilization containersUtilization; + private ContainersResourceUtilization latestContainersUtilization; + private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; private int overAlloctionPreemptionCpuCount = -1; @@ -127,7 +132,8 @@ public class ContainersMonitorImpl extends AbstractService implements this.monitoringThread = new MonitoringThread(); - this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); + this.latestContainersUtilization = new ContainersResourceUtilization( + ResourceUtilization.newInstance(-1, -1, -1.0f), -1L); } @Override @@ -363,6 +369,10 @@ public class ContainersMonitorImpl extends AbstractService implements this.overAllocationPreemptionThresholds = ResourceThresholds.newInstance( cpuPreemptionThreshold, memoryPreemptionThreshold); + // TODO make this configurable + this.overAllocationPolicy = + createOverAllocationPolicy(resourceThresholds); + LOG.info("NodeManager oversubscription enabled with overallocation " + "thresholds (memory:" + overAllocationMemoryUtilizationThreshold + ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" + @@ -370,6 +380,11 @@ public class ContainersMonitorImpl extends AbstractService implements cpuPreemptionThreshold + ")"); } + protected NMAllocationPolicy createOverAllocationPolicy( + ResourceThresholds resourceThresholds) { + return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this); + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this @@ -653,7 +668,12 @@ public class ContainersMonitorImpl extends AbstractService implements } // Save the aggregated utilization of the containers - setContainersUtilization(trackedContainersUtilization); + setLatestContainersUtilization(trackedContainersUtilization); + + // check opportunity to start containers if over-allocation is on + if (context.isOverAllocationEnabled()) { + attemptToStartContainersUponLowUtilization(); + } // Publish the container utilization metrics to node manager // metrics system. @@ -1023,12 +1043,34 @@ public class ContainersMonitorImpl extends AbstractService implements } @Override - public ResourceUtilization getContainersUtilization() { - return this.containersUtilization; + public ContainersResourceUtilization getContainersUtilization( + boolean latest) { + // TODO update containerUtilization if latest is true + return this.latestContainersUtilization; + } + + @Override + public NMAllocationPolicy getContainerOverAllocationPolicy() { + return overAllocationPolicy; + } + + private void setLatestContainersUtilization(ResourceUtilization utilization) { + this.latestContainersUtilization = new ContainersResourceUtilization( + utilization, System.currentTimeMillis()); } - private void setContainersUtilization(ResourceUtilization utilization) { - this.containersUtilization = utilization; + @VisibleForTesting + public void attemptToStartContainersUponLowUtilization() { + if (getContainerOverAllocationPolicy() != null) { + Resource available = getContainerOverAllocationPolicy() + .getAvailableResources(); + if (available.getMemorySize() > 0 && + available.getVirtualCores() > 0) { + eventDispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(null, + ContainerSchedulerEventType.SCHEDULE_CONTAINERS)); + } + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java new file mode 100644 index 0000000..86b3698 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of the resource utilization tracker that equates + * resource utilization with the total resource allocated to the container. + */ +public class AllocationBasedResourceTracker + implements ResourceUtilizationTracker { + + private static final Logger LOG = + LoggerFactory.getLogger(AllocationBasedResourceTracker.class); + + private static final Resource UNAVAILABLE = + Resource.newInstance(0, 0); + + private ResourceUtilization containersAllocation; + private ContainerScheduler scheduler; + + + AllocationBasedResourceTracker(ContainerScheduler scheduler) { + this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); + this.scheduler = scheduler; + } + + /** + * Get the accumulation of totally allocated resources to containers. + * @return ResourceUtilization Resource Utilization. + */ + @Override + public ResourceUtilization getCurrentUtilization() { + return this.containersAllocation; + } + + /** + * Get the amount of resources that have not been allocated to containers. + * @return Resource resources that have not been allocated to containers. + */ + protected Resource getUnallocatedResources() { + // unallocated resources = node capacity - containers allocation + // = -(container allocation - node capacity) + ResourceUtilization allocationClone = + ResourceUtilization.newInstance(containersAllocation); + getContainersMonitor() + .subtractNodeResourcesFromResourceUtilization(allocationClone); + + Resource unallocated = UNAVAILABLE; + if (allocationClone.getCPU() <= 0 && + allocationClone.getPhysicalMemory() <= 0 && + allocationClone.getVirtualMemory() <= 0) { + int cpu = Math.round(allocationClone.getCPU() * + getContainersMonitor().getVCoresAllocatedForContainers()); + long memory = allocationClone.getPhysicalMemory(); + unallocated = Resource.newInstance(-memory, -cpu); + } + return unallocated; + } + + + @Override + public Resource getAvailableResources() { + return getUnallocatedResources(); + } + + /** + * Add Container's resources to the accumulated allocation. + * @param container Container. + */ + @Override + public void containerLaunched(Container container) { + ContainersMonitor.increaseResourceUtilization( + getContainersMonitor(), this.containersAllocation, + container.getResource()); + } + + /** + * Subtract Container's resources to the accumulated allocation. + * @param container Container. + */ + @Override + public void containerReleased(Container container) { + ContainersMonitor.decreaseResourceUtilization( + getContainersMonitor(), this.containersAllocation, + container.getResource()); + } + + public ContainersMonitor getContainersMonitor() { + return this.scheduler.getContainersMonitor(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java deleted file mode 100644 index 6e2b617..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import org.apache.hadoop.yarn.api.records.ResourceUtilization; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of the {@link ResourceUtilizationTracker} that equates - * resource utilization with the total resource allocated to the container. - */ -public class AllocationBasedResourceUtilizationTracker implements - ResourceUtilizationTracker { - - private static final Logger LOG = - LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class); - - private ResourceUtilization containersAllocation; - private ContainerScheduler scheduler; - - AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) { - this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); - this.scheduler = scheduler; - } - - /** - * Get the accumulation of totally allocated resources to a container. - * @return ResourceUtilization Resource Utilization. - */ - @Override - public ResourceUtilization getCurrentUtilization() { - return this.containersAllocation; - } - - /** - * Add Container's resources to the accumulated Utilization. - * @param container Container. - */ - @Override - public void addContainerResources(Container container) { - ContainersMonitor.increaseResourceUtilization( - getContainersMonitor(), this.containersAllocation, - container.getResource()); - } - - /** - * Subtract Container's resources to the accumulated Utilization. - * @param container Container. - */ - @Override - public void subtractContainerResource(Container container) { - ContainersMonitor.decreaseResourceUtilization( - getContainersMonitor(), this.containersAllocation, - container.getResource()); - } - - /** - * Check if NM has resources available currently to run the container. - * @param container Container. - * @return True, if NM has resources available currently to run the container. - */ - @Override - public boolean hasResourcesAvailable(Container container) { - long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L; - return hasResourcesAvailable(pMemBytes, - (long) (getContainersMonitor().getVmemRatio()* pMemBytes), - container.getResource().getVirtualCores()); - } - - private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes, - int cpuVcores) { - // Check physical memory. - if (LOG.isDebugEnabled()) { - LOG.debug("pMemCheck [current={} + asked={} > allowed={}]", - this.containersAllocation.getPhysicalMemory(), - (pMemBytes >> 20), - (getContainersMonitor().getPmemAllocatedForContainers() >> 20)); - } - if (this.containersAllocation.getPhysicalMemory() + - (int) (pMemBytes >> 20) > - (int) (getContainersMonitor() - .getPmemAllocatedForContainers() >> 20)) { - return false; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("before vMemCheck" + - "[isEnabled={}, current={} + asked={} > allowed={}]", - getContainersMonitor().isVmemCheckEnabled(), - this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20), - (getContainersMonitor().getVmemAllocatedForContainers() >> 20)); - } - // Check virtual memory. - if (getContainersMonitor().isVmemCheckEnabled() && - this.containersAllocation.getVirtualMemory() + - (int) (vMemBytes >> 20) > - (int) (getContainersMonitor() - .getVmemAllocatedForContainers() >> 20)) { - return false; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("before cpuCheck [asked={} > allowed={}]", - this.containersAllocation.getCPU(), - getContainersMonitor().getVCoresAllocatedForContainers()); - } - // Check CPU. Compare using integral values of cores to avoid decimal - // inaccuracies. - if (!hasEnoughCpu(this.containersAllocation.getCPU(), - getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) { - return false; - } - return true; - } - - /** - * Returns whether there is enough space for coresRequested in totalCores. - * Converts currentAllocation usage to nearest integer count before comparing, - * as floats are inherently imprecise. NOTE: this calculation assumes that - * requested core counts must be integers, and currentAllocation core count - * must also be an integer. - * - * @param currentAllocation The current allocation, a float value from 0 to 1. - * @param totalCores The total cores in the system. - * @param coresRequested The number of cores requested. - * @return True if currentAllocationtotalCores*coresRequested <= - * totalCores. - */ - public boolean hasEnoughCpu(float currentAllocation, long totalCores, - int coresRequested) { - // Must not cast here, as it would truncate the decimal digits. - return Math.round(currentAllocation * totalCores) - + coresRequested <= totalCores; - } - - public ContainersMonitor getContainersMonitor() { - return this.scheduler.getContainersMonitor(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 5cdcf41..f5a86e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +79,10 @@ public class ContainerScheduler extends AbstractService implements // Queue of Guaranteed Containers waiting for resources to run private final LinkedHashMap<ContainerId, Container> queuedGuaranteedContainers = new LinkedHashMap<>(); + // sum of the resources requested by guaranteed containers in queue + private final Resource guaranteedResourcesDemanded = + Resource.newInstance(0, 0); + // Queue of Opportunistic Containers waiting for resources to run private final LinkedHashMap<ContainerId, Container> queuedOpportunisticContainers = new LinkedHashMap<>(); @@ -85,6 +91,10 @@ public class ContainerScheduler extends AbstractService implements // or paused to make room for a guaranteed container. private final Map<ContainerId, Container> oppContainersToKill = new HashMap<>(); + // sum of the resources to be released by opportunistic containers that + // have been marked to be killed or paused. + private final Resource opportunisticResourcesToBeReleased = + Resource.newInstance(0, 0); // Containers launched by the Scheduler will take a while to actually // move to the RUNNING state, but should still be fair game for killing @@ -125,6 +135,17 @@ public class ContainerScheduler extends AbstractService implements DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH)); } + @VisibleForTesting + public ContainerScheduler(Context context, AsyncDispatcher dispatcher, + NodeManagerMetrics metrics, int qLength) { + super(ContainerScheduler.class.getName()); + this.context = context; + this.dispatcher = dispatcher; + this.metrics = metrics; + this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; + this.opportunisticContainersStatus = + OpportunisticContainersStatus.newInstance(); + } @Override public void serviceInit(Configuration conf) throws Exception { @@ -152,20 +173,16 @@ public class ContainerScheduler extends AbstractService implements YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION, YarnConfiguration. DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION); + // We assume over allocation configurations have been initialized + this.utilizationTracker = getResourceTracker(); } - @VisibleForTesting - public ContainerScheduler(Context context, AsyncDispatcher dispatcher, - NodeManagerMetrics metrics, int qLength) { - super(ContainerScheduler.class.getName()); - this.context = context; - this.dispatcher = dispatcher; - this.metrics = metrics; - this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; - this.utilizationTracker = - new AllocationBasedResourceUtilizationTracker(this); - this.opportunisticContainersStatus = - OpportunisticContainersStatus.newInstance(); + private AllocationBasedResourceTracker getResourceTracker() { + if (context.isOverAllocationEnabled()) { + return new UtilizationBasedResourceTracker(this); + } else { + return new AllocationBasedResourceTracker(this); + } } /** @@ -188,14 +205,18 @@ public class ContainerScheduler extends AbstractService implements if (event instanceof UpdateContainerSchedulerEvent) { onUpdateContainer((UpdateContainerSchedulerEvent) event); } else { - LOG.error("Unknown event type on UpdateCOntainer: " + event.getType()); + LOG.error("Unknown event type on UpdateContainer: " + event.getType()); } break; case SHED_QUEUED_CONTAINERS: shedQueuedOpportunisticContainers(); break; case RECOVERY_COMPLETED: - startPendingContainers(maxOppQueueLength <= 0); + startPendingContainers(false); + break; + case SCHEDULE_CONTAINERS: + startPendingContainers(true); + break; default: LOG.error("Unknown event arrived at ContainerScheduler: " + event.toString()); @@ -210,10 +231,10 @@ public class ContainerScheduler extends AbstractService implements ContainerId containerId = updateEvent.getContainer().getContainerId(); if (updateEvent.isResourceChange()) { if (runningContainers.containsKey(containerId)) { - this.utilizationTracker.subtractContainerResource( + this.utilizationTracker.containerReleased( new ContainerImpl(getConfig(), null, null, null, null, updateEvent.getOriginalToken(), context)); - this.utilizationTracker.addContainerResources( + this.utilizationTracker.containerLaunched( updateEvent.getContainer()); getContainersMonitor().handle( new ChangeMonitoringContainerResourceEvent(containerId, @@ -229,17 +250,20 @@ public class ContainerScheduler extends AbstractService implements if (queuedOpportunisticContainers.remove(containerId) != null) { queuedGuaranteedContainers.put(containerId, updateEvent.getContainer()); - //Kill/pause opportunistic containers if any to make room for - // promotion request - reclaimOpportunisticContainerResources(updateEvent.getContainer()); + Resources.addTo(guaranteedResourcesDemanded, + updateEvent.getContainer().getResource()); + startPendingContainers(true); } } else { // Demotion of queued container.. Should not happen too often // since you should not find too many queued guaranteed // containers if (queuedGuaranteedContainers.remove(containerId) != null) { + Resources.subtractFrom(guaranteedResourcesDemanded, + updateEvent.getContainer().getResource()); queuedOpportunisticContainers.put(containerId, updateEvent.getContainer()); + startPendingContainers(false); } } try { @@ -266,6 +290,7 @@ public class ContainerScheduler extends AbstractService implements || rcs == RecoveredContainerStatus.PAUSED) { if (execType == ExecutionType.GUARANTEED) { queuedGuaranteedContainers.put(container.getContainerId(), container); + Resources.addTo(guaranteedResourcesDemanded, container.getResource()); } else if (execType == ExecutionType.OPPORTUNISTIC) { queuedOpportunisticContainers .put(container.getContainerId(), container); @@ -276,7 +301,7 @@ public class ContainerScheduler extends AbstractService implements } } else if (rcs == RecoveredContainerStatus.LAUNCHED) { runningContainers.put(container.getContainerId(), container); - utilizationTracker.addContainerResources(container); + utilizationTracker.containerLaunched(container); } } @@ -336,65 +361,107 @@ public class ContainerScheduler extends AbstractService implements } private void onResourcesReclaimed(Container container) { - oppContainersToKill.remove(container.getContainerId()); + ContainerId containerId = container.getContainerId(); + + // This could be killed externally for eg. by the ContainerManager, + // in which case, the container might still be queued. + if (queuedOpportunisticContainers.remove(containerId) != null) { + return; + } // This could be killed externally for eg. by the ContainerManager, // in which case, the container might still be queued. - Container queued = - queuedOpportunisticContainers.remove(container.getContainerId()); - if (queued == null) { - queuedGuaranteedContainers.remove(container.getContainerId()); + if (queuedGuaranteedContainers.remove(containerId) != null) { + Resources.addTo(guaranteedResourcesDemanded, container.getResource()); + return; + } + + if (oppContainersToKill.remove(containerId) != null) { + Resources.subtractFrom( + opportunisticResourcesToBeReleased, container.getResource()); } // Requeue PAUSED containers if (container.getContainerState() == ContainerState.PAUSED) { if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.GUARANTEED) { - queuedGuaranteedContainers.put(container.getContainerId(), container); + queuedGuaranteedContainers.put(containerId, container); + Resources.addTo(guaranteedResourcesDemanded, container.getResource()); } else { - queuedOpportunisticContainers.put( - container.getContainerId(), container); + queuedOpportunisticContainers.put(containerId, container); } } // decrement only if it was a running container - Container completedContainer = runningContainers.remove(container - .getContainerId()); + Container completedContainer = runningContainers.remove(containerId); // only a running container releases resources upon completion boolean resourceReleased = completedContainer != null; if (resourceReleased) { - this.utilizationTracker.subtractContainerResource(container); + this.utilizationTracker.containerReleased(container); if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.metrics.completeOpportunisticContainer(container.getResource()); } - boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); - startPendingContainers(forceStartGuaranteedContainers); + + // In case of over-allocation being turned on, we may need to reclaim + // more resources since the opportunistic containers that have been + // killed or paused may have not released as much resource as we need. + boolean reclaimOpportunisticResources = context.isOverAllocationEnabled(); + startPendingContainers(reclaimOpportunisticResources); } } /** * Start pending containers in the queue. - * @param forceStartGuaranteedContaieners When this is true, start guaranteed - * container without looking at available resource + * @param reclaimOpportunisticResources if set to true, resources allocated + * to running OPPORTUNISTIC containers will be reclaimed in + * cases where there are GUARANTEED containers being queued */ - private void startPendingContainers(boolean forceStartGuaranteedContaieners) { - // Start guaranteed containers that are paused, if resources available. - boolean resourcesAvailable = startContainers( - queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners); - // Start opportunistic containers, if resources available. - if (resourcesAvailable) { - startContainers(queuedOpportunisticContainers.values(), false); + private void startPendingContainers(boolean reclaimOpportunisticResources) { + // When opportunistic container not allowed (which is determined by + // max-queue length of pending opportunistic containers <= 0), start + // guaranteed containers without looking at available resources and + // skip scanning the queue of opportunistic containers + if (maxOppQueueLength <= 0) { + forcefullyStartGuaranteedContainers(); + return; + } + + Resource available = utilizationTracker.getAvailableResources(); + + // Start guaranteed containers that are queued, if resources available. + boolean allGuaranteedContainersLaunched = + startGuaranteedContainers(available); + // Start opportunistic containers, if resources available, which is true + // if all guaranteed containers in queue have been launched. + if (allGuaranteedContainersLaunched) { + startOpportunisticContainers(available); + } else { + // If not all guaranteed containers in queue are launched, we may need + // to reclaim resources from opportunistic containers that are running. + if (reclaimOpportunisticResources) { + reclaimOpportunisticContainerResources(); + } } } - private boolean startContainers( - Collection<Container> containersToBeStarted, boolean force) { - Iterator<Container> cIter = containersToBeStarted.iterator(); + /** + * Try to launch as many GUARANTEED containers as possible. + * @param available the amount of resources available to launch containers + * @return true if all queued GUARANTEED containers are launched + * or there is no GUARANTEED containers to launch + */ + private boolean startGuaranteedContainers(Resource available) { + Iterator<Container> cIter = + queuedGuaranteedContainers.values().iterator(); boolean resourcesAvailable = true; while (cIter.hasNext() && resourcesAvailable) { Container container = cIter.next(); - if (tryStartContainer(container, force)) { + if (isResourceAvailable(available, container)) { + startContainer(container); + Resources.subtractFrom(available, container.getResource()); cIter.remove(); + Resources.subtractFrom( + guaranteedResourcesDemanded, container.getResource()); } else { resourcesAvailable = false; } @@ -402,25 +469,49 @@ public class ContainerScheduler extends AbstractService implements return resourcesAvailable; } - private boolean tryStartContainer(Container container, boolean force) { - boolean containerStarted = false; - // call startContainer without checking available resource when force==true - if (force || resourceAvailableToStartContainer( - container)) { + /** + * Launch all queued GUARANTEED containers without checking resource + * availability. This is an optimization in cases where OPPORTUNISTIC + * containers are not allowed on the node. + */ + private void forcefullyStartGuaranteedContainers() { + Iterator<Container> cIter = + queuedGuaranteedContainers.values().iterator(); + while (cIter.hasNext()) { + Container container = cIter.next(); startContainer(container); - containerStarted = true; + cIter.remove(); + Resources.subtractFrom( + guaranteedResourcesDemanded, container.getResource()); } - return containerStarted; } - /** - * Check if there is resource available to start a given container - * immediately. (This can be extended to include overallocated resources) - * @param container the container to start - * @return true if container can be launched directly + * Try to launch as many OPPORTUNISTIC containers as possible. + * @param available the amount of resources available to launch containers + * @return true if all OPPORTUNISTIC containers are launched + * or there is no OPPORTUNISTIC containers to launch */ - private boolean resourceAvailableToStartContainer(Container container) { - return this.utilizationTracker.hasResourcesAvailable(container); + private boolean startOpportunisticContainers(Resource available) { + Iterator<Container> cIter = + queuedOpportunisticContainers.values().iterator(); + boolean resourcesAvailable = true; + while (cIter.hasNext() && resourcesAvailable) { + Container container = cIter.next(); + if (isResourceAvailable(available, container)) { + startContainer(container); + Resources.subtractFrom(available, container.getResource()); + cIter.remove(); + } else { + resourcesAvailable = false; + } + } + return resourcesAvailable; + } + + private static boolean isResourceAvailable( + Resource resource, Container container) { + Resource left = Resources.subtract(resource, container.getResource()); + return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0; } private boolean enqueueContainer(Container container) { @@ -430,6 +521,7 @@ public class ContainerScheduler extends AbstractService implements boolean isQueued; if (isGuaranteedContainer) { queuedGuaranteedContainers.put(container.getContainerId(), container); + Resources.addTo(guaranteedResourcesDemanded, container.getResource()); isQueued = true; } else { if (queuedOpportunisticContainers.size() < maxOppQueueLength) { @@ -474,18 +566,7 @@ public class ContainerScheduler extends AbstractService implements // enough number of opportunistic containers. if (isGuaranteedContainer) { enqueueContainer(container); - - // When opportunistic container not allowed (which is determined by - // max-queue length of pending opportunistic containers <= 0), start - // guaranteed containers without looking at available resources. - boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); - startPendingContainers(forceStartGuaranteedContainers); - - // if the guaranteed container is queued, we need to preempt opportunistic - // containers for make room for it - if (queuedGuaranteedContainers.containsKey(container.getContainerId())) { - reclaimOpportunisticContainerResources(container); - } + startPendingContainers(true); } else { // Given an opportunistic container, we first try to start as many queuing // guaranteed containers as possible followed by queuing opportunistic @@ -503,19 +584,19 @@ public class ContainerScheduler extends AbstractService implements } @SuppressWarnings("unchecked") - private void reclaimOpportunisticContainerResources(Container container) { + private void reclaimOpportunisticContainerResources() { List<Container> extraOppContainersToReclaim = - pickOpportunisticContainersToReclaimResources( - container.getContainerId()); - // Kill the opportunistic containers that were chosen. - for (Container contToReclaim : extraOppContainersToReclaim) { + pickOpportunisticContainersToReclaimResources(); + killOpportunisticContainers(extraOppContainersToReclaim); + } + + private void killOpportunisticContainers( + Collection<Container> containersToReclaim) { + for (Container contToReclaim : containersToReclaim) { String preemptionAction = usePauseEventForPreemption == true ? "paused" : - "resumed"; - LOG.info( - "Container {} will be {} to start the " - + "execution of guaranteed container {}.", - contToReclaim.getContainerId(), preemptionAction, - container.getContainerId()); + "preempted"; + LOG.info("Container {} will be {} to start the execution of guaranteed" + + " containers.", contToReclaim.getContainerId(), preemptionAction); if (usePauseEventForPreemption) { contToReclaim.sendPauseEvent( @@ -526,6 +607,8 @@ public class ContainerScheduler extends AbstractService implements "Container Killed to make room for Guaranteed Container."); } oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim); + Resources.addTo( + opportunisticResourcesToBeReleased, contToReclaim.getResource()); } } @@ -534,7 +617,7 @@ public class ContainerScheduler extends AbstractService implements // Skip to put into runningContainers and addUtilization when recover if (!runningContainers.containsKey(container.getContainerId())) { runningContainers.put(container.getContainerId(), container); - this.utilizationTracker.addContainerResources(container); + this.utilizationTracker.containerLaunched(container); } if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { @@ -543,14 +626,12 @@ public class ContainerScheduler extends AbstractService implements container.sendLaunchEvent(); } - private List<Container> pickOpportunisticContainersToReclaimResources( - ContainerId containerToStartId) { + private List<Container> pickOpportunisticContainersToReclaimResources() { // The opportunistic containers that need to be killed for the // given container to start. List<Container> extraOpportContainersToKill = new ArrayList<>(); // Track resources that need to be freed. - ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( - containerToStartId); + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); // Go over the running opportunistic containers. // Use a descending iterator to kill more recently started containers. @@ -569,15 +650,19 @@ public class ContainerScheduler extends AbstractService implements continue; } extraOpportContainersToKill.add(runningCont); + // In the case of over-allocation, the running container may not + // release as much resources as it has requested, but we'll check + // again if more containers need to be killed/paused when this + // container is released. ContainersMonitor.decreaseResourceUtilization( getContainersMonitor(), resourcesToFreeUp, runningCont.getResource()); } } if (!hasSufficientResources(resourcesToFreeUp)) { - LOG.warn("There are no sufficient resources to start guaranteed [{}]" + - "at the moment. Opportunistic containers are in the process of" + - "being killed to make room.", containerToStartId); + LOG.warn("There are no sufficient resources to start guaranteed" + + " containers at the moment. Opportunistic containers are in" + + " the process of being killed to make room."); } return extraOpportContainersToKill; } @@ -592,34 +677,42 @@ public class ContainerScheduler extends AbstractService implements * getContainersMonitor().getVCoresAllocatedForContainers()) <= 0; } - private ResourceUtilization resourcesToFreeUp( - ContainerId containerToStartId) { + /** + * Determine how much resources are needed to be freed up to launch the given + * GUARANTEED container. Used to determine how many running OPPORTUNISTIC + * containers need to be killed/paused, assuming OPPORTUNISTIC containers to + * be killed/paused will release the amount of resources they have requested. + * + * If the node is over-allocating itself, this may cause not enough + * OPPORTUNISTIC containers being killed/paused in cases where the running + * OPPORTUNISTIC containers are not consuming fully their resource requests. + * We'd check again upon container completion events to see if more running + * OPPORTUNISTIC containers need to be killed/paused. + * + * @return the amount of resource needed to be reclaimed for this container + */ + private ResourceUtilization resourcesToFreeUp() { // Get allocation of currently allocated containers. ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization - .newInstance(this.utilizationTracker.getCurrentUtilization()); - - // Add to the allocation the allocation of the pending guaranteed - // containers that will start before the current container will be started. - for (Container container : queuedGuaranteedContainers.values()) { - ContainersMonitor.increaseResourceUtilization( - getContainersMonitor(), resourceAllocationToFreeUp, - container.getResource()); - if (container.getContainerId().equals(containerToStartId)) { - break; - } - } + .newInstance(0, 0, 0.0f); + + // Add to the allocation the allocation of pending guaranteed containers. + ContainersMonitor.increaseResourceUtilization(getContainersMonitor(), + resourceAllocationToFreeUp, guaranteedResourcesDemanded); // These resources are being freed, likely at the behest of another // guaranteed container.. - for (Container container : oppContainersToKill.values()) { - ContainersMonitor.decreaseResourceUtilization( - getContainersMonitor(), resourceAllocationToFreeUp, - container.getResource()); + ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(), + resourceAllocationToFreeUp, opportunisticResourcesToBeReleased); + + // Deduct any remaining resources available + Resource availableResources = utilizationTracker.getAvailableResources(); + if (availableResources.getVirtualCores() > 0 && + availableResources.getMemorySize() > 0) { + ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(), + resourceAllocationToFreeUp, availableResources); } - // Subtract the overall node resources. - getContainersMonitor().subtractNodeResourcesFromResourceUtilization( - resourceAllocationToFreeUp); return resourceAllocationToFreeUp; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 294eddf..9ad4f91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -28,5 +28,7 @@ public enum ContainerSchedulerEventType { // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, CONTAINER_PAUSED, - RECOVERY_COMPLETED + RECOVERY_COMPLETED, + // Producer: Containers Monitor when over-allocation is on + SCHEDULE_CONTAINERS } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java new file mode 100644 index 0000000..58b73d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * Keeps track of containers utilization over time and determines how much + * resources are available to launch containers when over-allocation is on. + */ +public abstract class NMAllocationPolicy { + protected final ResourceThresholds overAllocationThresholds; + protected final ContainersMonitor containersMonitor; + + public NMAllocationPolicy( + ResourceThresholds overAllocationThresholds, + ContainersMonitor containersMonitor) { + this.containersMonitor = containersMonitor; + this.overAllocationThresholds = overAllocationThresholds; + } + + /** + * Handle container launch events. + * @param container the container that has been launched + */ + public void containerLaunched(Container container) { + + } + + /** + * Handle container release events. + * @param container the container that has been released + */ + public void containerReleased(Container container) { + + } + + /** + * Get the amount of resources to launch containers when + * over-allocation is turned on. + * @return the amount of resources available to launch containers + */ + public abstract Resource getAvailableResources(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 3c17eca..98d99c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -38,22 +39,20 @@ public interface ResourceUtilizationTracker { ResourceUtilization getCurrentUtilization(); /** - * Add Container's resources to Node Utilization. - * @param container Container. + * Get the amount of resources currently available to launch containers. + * @return Resource resources available to launch containers */ - void addContainerResources(Container container); + Resource getAvailableResources(); /** - * Subtract Container's resources to Node Utilization. + * Add Container's resources to Node Utilization upon container launch. * @param container Container. */ - void subtractContainerResource(Container container); + void containerLaunched(Container container); /** - * Check if NM has resources available currently to run the container. + * Subtract Container's resources to Node Utilization upon container release. * @param container Container. - * @return True, if NM has resources available currently to run the container. */ - boolean hasResourcesAvailable(Container container); - + void containerReleased(Container container); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java new file mode 100644 index 0000000..f486506 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * An implementation of NMAllocationPolicy based on the + * snapshot of the latest containers utilization to determine how much + * resources are available * to launch containers when over-allocation + * is turned on. + */ +public class SnapshotBasedOverAllocationPolicy + extends NMAllocationPolicy { + + public SnapshotBasedOverAllocationPolicy( + ResourceThresholds overAllocationThresholds, + ContainersMonitor containersMonitor) { + super(overAllocationThresholds, containersMonitor); + } + + @Override + public Resource getAvailableResources() { + ResourceUtilization utilization = + containersMonitor.getContainersUtilization(true).getUtilization(); + long memoryAvailable = Math.round( + overAllocationThresholds.getMemoryThreshold() * + containersMonitor.getPmemAllocatedForContainers()) - + (utilization.getPhysicalMemory() << 20); + int vcoreAvailable = Math.round( + (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) * + containersMonitor.getVCoresAllocatedForContainers()); + return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java new file mode 100644 index 0000000..6f9bc82 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** +* An resource availability tracker that determines if there are resources +* available based on if there are unallocated resources or if there are +* un-utilized resources. +*/ +public class UtilizationBasedResourceTracker + extends AllocationBasedResourceTracker { + private static final Logger LOG = + LoggerFactory.getLogger(AllocationBasedResourceTracker.class); + + private final NMAllocationPolicy overAllocationPolicy; + + UtilizationBasedResourceTracker(ContainerScheduler scheduler) { + super(scheduler); + this.overAllocationPolicy = + getContainersMonitor().getContainerOverAllocationPolicy(); + } + + @Override + public void containerLaunched(Container container) { + super.containerLaunched(container); + if (overAllocationPolicy != null) { + overAllocationPolicy.containerLaunched(container); + } + } + + @Override + public void containerReleased(Container container) { + super.containerReleased(container); + if (overAllocationPolicy != null) { + overAllocationPolicy.containerReleased(container); + } + } + + @Override + public Resource getAvailableResources() { + Resource resourceBasedOnAllocation = getUnallocatedResources(); + Resource resourceBasedOnUtilization = + getResourcesAvailableBasedOnUtilization(); + if (LOG.isDebugEnabled()) { + LOG.debug("The amount of resources available based on allocation is " + + resourceBasedOnAllocation + ", based on utilization is " + + resourceBasedOnUtilization); + } + + return Resources.componentwiseMax(resourceBasedOnAllocation, + resourceBasedOnUtilization); + } + + /** + * Get the amount of resources based on the slack between + * the actual utilization and desired utilization. + * @return Resource resource available + */ + private Resource getResourcesAvailableBasedOnUtilization() { + if (overAllocationPolicy == null) { + return Resources.none(); + } + + return overAllocationPolicy.getAvailableResources(); + } + + @Override + public ResourceUtilization getCurrentUtilization() { + return getContainersMonitor().getContainersUtilization(false) + .getUtilization(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 93d0afb..05e9dd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -345,6 +346,40 @@ public abstract class BaseContainerManagerTest { fStates.contains(containerStatus.getState())); } + public static void waitForContainerSubState( + ContainerManagementProtocol containerManager, ContainerId containerID, + ContainerSubState finalState) + throws InterruptedException, YarnException, IOException { + waitForContainerSubState(containerManager, containerID, + Arrays.asList(finalState), 20); + } + public static void waitForContainerSubState( + ContainerManagementProtocol containerManager, ContainerId containerID, + List<ContainerSubState> finalStates, int timeOutMax) + throws InterruptedException, YarnException, IOException { + List<ContainerId> list = new ArrayList<>(); + list.add(containerID); + GetContainerStatusesRequest request = + GetContainerStatusesRequest.newInstance(list); + ContainerStatus containerStatus; + HashSet<ContainerSubState> fStates = new HashSet<>(finalStates); + int timeoutSecs = 0; + do { + Thread.sleep(1000); + containerStatus = + containerManager.getContainerStatuses(request) + .getContainerStatuses().get(0); + LOG.info("Waiting for container to get into one of states " + fStates + + ". Current state is " + containerStatus.getContainerSubState()); + timeoutSecs += 1; + } while (!fStates.contains(containerStatus.getContainerSubState()) + && timeoutSecs < timeOutMax); + LOG.info("Container state is " + containerStatus.getContainerSubState()); + Assert.assertTrue("ContainerSubState is not correct (timedout)", + fStates.contains(containerStatus.getContainerSubState())); + } + + public static void waitForApplicationState( ContainerManagerImpl containerManager, ApplicationId appID, ApplicationState finalState) http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java index 8aee532..c071283 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -288,7 +288,7 @@ public class TestContainersMonitorResourceChange { // will be 0. assertEquals( "Resource utilization must be default with MonitorThread's first run", - 0, containersMonitor.getContainersUtilization() + 0, containersMonitor.getContainersUtilization(false).getUtilization() .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))); // Verify the container utilization value. Since atleast one round is done, @@ -303,8 +303,9 @@ public class TestContainersMonitorResourceChange { ContainersMonitorImpl containersMonitor, int timeoutMsecs) throws InterruptedException { int timeWaiting = 0; - while (0 == containersMonitor.getContainersUtilization() - .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))) { + while (0 == containersMonitor.getContainersUtilization(false) + .getUtilization().compareTo( + ResourceUtilization.newInstance(0, 0, 0.0f))) { if (timeWaiting >= timeoutMsecs) { break; } @@ -316,7 +317,7 @@ public class TestContainersMonitorResourceChange { } assertTrue("Resource utilization is not changed from second run onwards", - 0 != containersMonitor.getContainersUtilization() + 0 != containersMonitor.getContainersUtilization(false).getUtilization() .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java new file mode 100644 index 0000000..1e8bfdf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for the {@link AllocationBasedResourceTracker} class. + */ +public class TestAllocationBasedResourceTracker { + + private ContainerScheduler mockContainerScheduler; + + @Before + public void setup() { + mockContainerScheduler = mock(ContainerScheduler.class); + ContainersMonitor containersMonitor = + new ContainersMonitorImpl(mock(ContainerExecutor.class), + mock(AsyncDispatcher.class), mock(Context.class)); + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024); + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); + conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f); + conf.setInt(YarnConfiguration.NM_VCORES, 8); + containersMonitor.init(conf); + when(mockContainerScheduler.getContainersMonitor()) + .thenReturn(containersMonitor); + } + + /** + * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the + * hasResourceAvailable should return false. + */ + @Test + public void testHasResourcesAvailable() { + AllocationBasedResourceTracker tracker = + new AllocationBasedResourceTracker(mockContainerScheduler); + Container testContainer = mock(Container.class); + when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4)); + for (int i = 0; i < 2; i++) { + Assert.assertTrue( + isResourcesAvailable(tracker.getAvailableResources(), testContainer)); + tracker.containerLaunched(testContainer); + } + Assert.assertFalse( + isResourcesAvailable(tracker.getAvailableResources(), testContainer)); + } + + private static boolean isResourcesAvailable( + Resource available, Container container) { + return available.compareTo(container.getResource()) >= 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6acfc44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java deleted file mode 100644 index 82c2147..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Tests for the {@link AllocationBasedResourceUtilizationTracker} class. - */ -public class TestAllocationBasedResourceUtilizationTracker { - - private ContainerScheduler mockContainerScheduler; - - @Before - public void setup() { - mockContainerScheduler = mock(ContainerScheduler.class); - ContainersMonitor containersMonitor = - new ContainersMonitorImpl(mock(ContainerExecutor.class), - mock(AsyncDispatcher.class), mock(Context.class)); - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024); - conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); - conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f); - conf.setInt(YarnConfiguration.NM_VCORES, 8); - containersMonitor.init(conf); - when(mockContainerScheduler.getContainersMonitor()) - .thenReturn(containersMonitor); - } - - /** - * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the - * hasResourceAvailable should return false. - */ - @Test - public void testHasResourcesAvailable() { - AllocationBasedResourceUtilizationTracker tracker = - new AllocationBasedResourceUtilizationTracker(mockContainerScheduler); - Container testContainer = mock(Container.class); - when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4)); - for (int i = 0; i < 2; i++) { - Assert.assertTrue(tracker.hasResourcesAvailable(testContainer)); - tracker.addContainerResources(testContainer); - } - Assert.assertFalse(tracker.hasResourcesAvailable(testContainer)); - } - - /** - * Test the case where the current allocation has been truncated to 0.8888891 - * (8/9 cores used). Request 1 additional core - hasEnoughCpu should return - * true. - */ - @Test - public void testHasEnoughCpu() { - AllocationBasedResourceUtilizationTracker tracker = - new AllocationBasedResourceUtilizationTracker(mockContainerScheduler); - float currentAllocation = 0.8888891f; - long totalCores = 9; - int alreadyUsedCores = 8; - Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores, - (int) totalCores - alreadyUsedCores)); - Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores, - (int) totalCores - alreadyUsedCores + 1)); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org