Revert "YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen."
This reverts commit c1362b68af03c546f4c0802758e9729d2372ab6c. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1235cf5d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1235cf5d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1235cf5d Branch: refs/heads/YARN-1011 Commit: 1235cf5deb9f309e975c4f59ee27caf1282f19f9 Parents: 8a7c172 Author: Haibo Chen <haiboc...@apache.org> Authored: Tue May 8 13:50:06 2018 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Tue Jun 5 10:46:39 2018 -0700 ---------------------------------------------------------------------- .../nodemanager/NodeStatusUpdaterImpl.java | 2 +- .../containermanager/ContainerManagerImpl.java | 8 +- .../launcher/ContainerLaunch.java | 2 +- .../launcher/ContainersLauncher.java | 9 +- .../monitor/ContainersMonitor.java | 38 +- .../monitor/ContainersMonitorImpl.java | 56 +- .../AllocationBasedResourceTracker.java | 114 -- ...locationBasedResourceUtilizationTracker.java | 158 +++ .../scheduler/ContainerScheduler.java | 317 ++--- .../scheduler/ContainerSchedulerEventType.java | 4 +- .../scheduler/NMAllocationPolicy.java | 63 - .../scheduler/ResourceUtilizationTracker.java | 17 +- .../SnapshotBasedOverAllocationPolicy.java | 54 - .../UtilizationBasedResourceTracker.java | 95 -- .../BaseContainerManagerTest.java | 35 - .../TestContainersMonitorResourceChange.java | 9 +- .../TestAllocationBasedResourceTracker.java | 82 -- ...locationBasedResourceUtilizationTracker.java | 93 ++ .../TestContainerSchedulerRecovery.java | 58 +- ...estContainerSchedulerWithOverAllocation.java | 1121 ------------------ 20 files changed, 419 insertions(+), 1916 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index a2f70d5..44f9740 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -532,7 +532,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private ResourceUtilization getContainersUtilization() { ContainersMonitor containersMonitor = this.context.getContainerManager().getContainersMonitor(); - return containersMonitor.getContainersUtilization(false).getUtilization(); + return containersMonitor.getContainersUtilization(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 77ea4fa..3470910 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -243,12 +243,6 @@ public class ContainerManagerImpl extends CompositeService implements metrics); addService(rsrcLocalizationSrvc); - this.containersMonitor = createContainersMonitor(exec); - addService(this.containersMonitor); - - // ContainersLauncher must be added after ContainersMonitor - // because the former depends on the latter to initialize - // over-allocation first. containersLauncher = createContainersLauncher(context, exec); addService(containersLauncher); @@ -273,6 +267,8 @@ public class ContainerManagerImpl extends CompositeService implements nmMetricsPublisher = createNMTimelinePublisher(context); context.setNMTimelinePublisher(nmMetricsPublisher); } + this.containersMonitor = createContainersMonitor(exec); + addService(this.containersMonitor); dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 269e319..57abfc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -1095,7 +1095,7 @@ public class ContainerLaunch implements Callable<Integer> { * @return Process ID * @throws Exception */ - protected String getContainerPid(Path pidFilePath) throws Exception { + private String getContainerPid(Path pidFilePath) throws Exception { String containerIdStr = container.getContainerId().toString(); String processId = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index c3d0a4d..cfd5d6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -116,7 +116,8 @@ public class ContainersLauncher extends AbstractService containerId.getApplicationAttemptId().getApplicationId()); ContainerLaunch launch = - createContainerLaunch(app, event.getContainer()); + new ContainerLaunch(context, getConfig(), dispatcher, exec, app, + event.getContainer(), dirsHandler, containerManager); containerLauncher.submit(launch); running.put(containerId, launch); break; @@ -212,10 +213,4 @@ public class ContainersLauncher extends AbstractService break; } } - - protected ContainerLaunch createContainerLaunch( - Application app, Container container) { - return new ContainerLaunch(context, getConfig(), dispatcher, - exec, app, container, dirsHandler, containerManager); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 8da4ec4..64831e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -23,24 +23,10 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; public interface ContainersMonitor extends Service, EventHandler<ContainersMonitorEvent>, ResourceView { - - /** - * Get the aggregate resource utilization of containers running on the node, - * with a timestamp of the measurement. - * @param latest true if the latest result should be returned - * @return ResourceUtilization resource utilization of all containers - */ - ContainersResourceUtilization getContainersUtilization(boolean latest); - - /** - * Get the policy to over-allocate containers when over-allocation is on. - * @return null if over-allocation is turned off - */ - NMAllocationPolicy getContainerOverAllocationPolicy(); + ResourceUtilization getContainersUtilization(); float getVmemRatio(); @@ -80,26 +66,4 @@ public interface ContainersMonitor extends Service, * containersMonitor.getVmemRatio()); resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores); } - - /** - * A snapshot of resource utilization of all containers with the timestamp. - */ - final class ContainersResourceUtilization { - private final ResourceUtilization utilization; - private final long timestamp; - - public ContainersResourceUtilization( - ResourceUtilization utilization, long timestamp) { - this.utilization = utilization; - this.timestamp = timestamp; - } - - public long getTimestamp() { - return timestamp; - } - - public ResourceUtilization getUtilization() { - return utilization; - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 8e9bad8..a133117 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -24,10 +24,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,9 +110,8 @@ public class ContainersMonitorImpl extends AbstractService implements CPU, MEMORY } - private ContainersResourceUtilization latestContainersUtilization; + private ResourceUtilization containersUtilization; - private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; private int overAlloctionPreemptionCpuCount = -1; @@ -132,8 +127,7 @@ public class ContainersMonitorImpl extends AbstractService implements this.monitoringThread = new MonitoringThread(); - this.latestContainersUtilization = new ContainersResourceUtilization( - ResourceUtilization.newInstance(-1, -1, -1.0f), -1L); + this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); } @Override @@ -369,10 +363,6 @@ public class ContainersMonitorImpl extends AbstractService implements this.overAllocationPreemptionThresholds = ResourceThresholds.newInstance( cpuPreemptionThreshold, memoryPreemptionThreshold); - // TODO make this configurable - this.overAllocationPolicy = - createOverAllocationPolicy(resourceThresholds); - LOG.info("NodeManager oversubscription enabled with overallocation " + "thresholds (memory:" + overAllocationMemoryUtilizationThreshold + ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" + @@ -380,11 +370,6 @@ public class ContainersMonitorImpl extends AbstractService implements cpuPreemptionThreshold + ")"); } - protected NMAllocationPolicy createOverAllocationPolicy( - ResourceThresholds resourceThresholds) { - return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this); - } - private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this @@ -668,12 +653,7 @@ public class ContainersMonitorImpl extends AbstractService implements } // Save the aggregated utilization of the containers - setLatestContainersUtilization(trackedContainersUtilization); - - // check opportunity to start containers if over-allocation is on - if (context.isOverAllocationEnabled()) { - attemptToStartContainersUponLowUtilization(); - } + setContainersUtilization(trackedContainersUtilization); // Publish the container utilization metrics to node manager // metrics system. @@ -1043,34 +1023,12 @@ public class ContainersMonitorImpl extends AbstractService implements } @Override - public ContainersResourceUtilization getContainersUtilization( - boolean latest) { - // TODO update containerUtilization if latest is true - return this.latestContainersUtilization; - } - - @Override - public NMAllocationPolicy getContainerOverAllocationPolicy() { - return overAllocationPolicy; - } - - private void setLatestContainersUtilization(ResourceUtilization utilization) { - this.latestContainersUtilization = new ContainersResourceUtilization( - utilization, System.currentTimeMillis()); + public ResourceUtilization getContainersUtilization() { + return this.containersUtilization; } - @VisibleForTesting - public void attemptToStartContainersUponLowUtilization() { - if (getContainerOverAllocationPolicy() != null) { - Resource available = getContainerOverAllocationPolicy() - .getAvailableResources(); - if (available.getMemorySize() > 0 && - available.getVirtualCores() > 0) { - eventDispatcher.getEventHandler().handle( - new ContainerSchedulerEvent(null, - ContainerSchedulerEventType.SCHEDULE_CONTAINERS)); - } - } + private void setContainersUtilization(ResourceUtilization utilization) { + this.containersUtilization = utilization; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java deleted file mode 100644 index 86b3698..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceUtilization; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of the resource utilization tracker that equates - * resource utilization with the total resource allocated to the container. - */ -public class AllocationBasedResourceTracker - implements ResourceUtilizationTracker { - - private static final Logger LOG = - LoggerFactory.getLogger(AllocationBasedResourceTracker.class); - - private static final Resource UNAVAILABLE = - Resource.newInstance(0, 0); - - private ResourceUtilization containersAllocation; - private ContainerScheduler scheduler; - - - AllocationBasedResourceTracker(ContainerScheduler scheduler) { - this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); - this.scheduler = scheduler; - } - - /** - * Get the accumulation of totally allocated resources to containers. - * @return ResourceUtilization Resource Utilization. - */ - @Override - public ResourceUtilization getCurrentUtilization() { - return this.containersAllocation; - } - - /** - * Get the amount of resources that have not been allocated to containers. - * @return Resource resources that have not been allocated to containers. - */ - protected Resource getUnallocatedResources() { - // unallocated resources = node capacity - containers allocation - // = -(container allocation - node capacity) - ResourceUtilization allocationClone = - ResourceUtilization.newInstance(containersAllocation); - getContainersMonitor() - .subtractNodeResourcesFromResourceUtilization(allocationClone); - - Resource unallocated = UNAVAILABLE; - if (allocationClone.getCPU() <= 0 && - allocationClone.getPhysicalMemory() <= 0 && - allocationClone.getVirtualMemory() <= 0) { - int cpu = Math.round(allocationClone.getCPU() * - getContainersMonitor().getVCoresAllocatedForContainers()); - long memory = allocationClone.getPhysicalMemory(); - unallocated = Resource.newInstance(-memory, -cpu); - } - return unallocated; - } - - - @Override - public Resource getAvailableResources() { - return getUnallocatedResources(); - } - - /** - * Add Container's resources to the accumulated allocation. - * @param container Container. - */ - @Override - public void containerLaunched(Container container) { - ContainersMonitor.increaseResourceUtilization( - getContainersMonitor(), this.containersAllocation, - container.getResource()); - } - - /** - * Subtract Container's resources to the accumulated allocation. - * @param container Container. - */ - @Override - public void containerReleased(Container container) { - ContainersMonitor.decreaseResourceUtilization( - getContainersMonitor(), this.containersAllocation, - container.getResource()); - } - - public ContainersMonitor getContainersMonitor() { - return this.scheduler.getContainersMonitor(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java new file mode 100644 index 0000000..6e2b617 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of the {@link ResourceUtilizationTracker} that equates + * resource utilization with the total resource allocated to the container. + */ +public class AllocationBasedResourceUtilizationTracker implements + ResourceUtilizationTracker { + + private static final Logger LOG = + LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class); + + private ResourceUtilization containersAllocation; + private ContainerScheduler scheduler; + + AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) { + this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); + this.scheduler = scheduler; + } + + /** + * Get the accumulation of totally allocated resources to a container. + * @return ResourceUtilization Resource Utilization. + */ + @Override + public ResourceUtilization getCurrentUtilization() { + return this.containersAllocation; + } + + /** + * Add Container's resources to the accumulated Utilization. + * @param container Container. + */ + @Override + public void addContainerResources(Container container) { + ContainersMonitor.increaseResourceUtilization( + getContainersMonitor(), this.containersAllocation, + container.getResource()); + } + + /** + * Subtract Container's resources to the accumulated Utilization. + * @param container Container. + */ + @Override + public void subtractContainerResource(Container container) { + ContainersMonitor.decreaseResourceUtilization( + getContainersMonitor(), this.containersAllocation, + container.getResource()); + } + + /** + * Check if NM has resources available currently to run the container. + * @param container Container. + * @return True, if NM has resources available currently to run the container. + */ + @Override + public boolean hasResourcesAvailable(Container container) { + long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L; + return hasResourcesAvailable(pMemBytes, + (long) (getContainersMonitor().getVmemRatio()* pMemBytes), + container.getResource().getVirtualCores()); + } + + private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes, + int cpuVcores) { + // Check physical memory. + if (LOG.isDebugEnabled()) { + LOG.debug("pMemCheck [current={} + asked={} > allowed={}]", + this.containersAllocation.getPhysicalMemory(), + (pMemBytes >> 20), + (getContainersMonitor().getPmemAllocatedForContainers() >> 20)); + } + if (this.containersAllocation.getPhysicalMemory() + + (int) (pMemBytes >> 20) > + (int) (getContainersMonitor() + .getPmemAllocatedForContainers() >> 20)) { + return false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("before vMemCheck" + + "[isEnabled={}, current={} + asked={} > allowed={}]", + getContainersMonitor().isVmemCheckEnabled(), + this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20), + (getContainersMonitor().getVmemAllocatedForContainers() >> 20)); + } + // Check virtual memory. + if (getContainersMonitor().isVmemCheckEnabled() && + this.containersAllocation.getVirtualMemory() + + (int) (vMemBytes >> 20) > + (int) (getContainersMonitor() + .getVmemAllocatedForContainers() >> 20)) { + return false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("before cpuCheck [asked={} > allowed={}]", + this.containersAllocation.getCPU(), + getContainersMonitor().getVCoresAllocatedForContainers()); + } + // Check CPU. Compare using integral values of cores to avoid decimal + // inaccuracies. + if (!hasEnoughCpu(this.containersAllocation.getCPU(), + getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) { + return false; + } + return true; + } + + /** + * Returns whether there is enough space for coresRequested in totalCores. + * Converts currentAllocation usage to nearest integer count before comparing, + * as floats are inherently imprecise. NOTE: this calculation assumes that + * requested core counts must be integers, and currentAllocation core count + * must also be an integer. + * + * @param currentAllocation The current allocation, a float value from 0 to 1. + * @param totalCores The total cores in the system. + * @param coresRequested The number of cores requested. + * @return True if currentAllocationtotalCores*coresRequested <= + * totalCores. + */ + public boolean hasEnoughCpu(float currentAllocation, long totalCores, + int coresRequested) { + // Must not cast here, as it would truncate the decimal digits. + return Math.round(currentAllocation * totalCores) + + coresRequested <= totalCores; + } + + public ContainersMonitor getContainersMonitor() { + return this.scheduler.getContainersMonitor(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index f5a86e9..5cdcf41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -24,7 +24,6 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -46,7 +45,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; -import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,10 +77,6 @@ public class ContainerScheduler extends AbstractService implements // Queue of Guaranteed Containers waiting for resources to run private final LinkedHashMap<ContainerId, Container> queuedGuaranteedContainers = new LinkedHashMap<>(); - // sum of the resources requested by guaranteed containers in queue - private final Resource guaranteedResourcesDemanded = - Resource.newInstance(0, 0); - // Queue of Opportunistic Containers waiting for resources to run private final LinkedHashMap<ContainerId, Container> queuedOpportunisticContainers = new LinkedHashMap<>(); @@ -91,10 +85,6 @@ public class ContainerScheduler extends AbstractService implements // or paused to make room for a guaranteed container. private final Map<ContainerId, Container> oppContainersToKill = new HashMap<>(); - // sum of the resources to be released by opportunistic containers that - // have been marked to be killed or paused. - private final Resource opportunisticResourcesToBeReleased = - Resource.newInstance(0, 0); // Containers launched by the Scheduler will take a while to actually // move to the RUNNING state, but should still be fair game for killing @@ -135,17 +125,6 @@ public class ContainerScheduler extends AbstractService implements DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH)); } - @VisibleForTesting - public ContainerScheduler(Context context, AsyncDispatcher dispatcher, - NodeManagerMetrics metrics, int qLength) { - super(ContainerScheduler.class.getName()); - this.context = context; - this.dispatcher = dispatcher; - this.metrics = metrics; - this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; - this.opportunisticContainersStatus = - OpportunisticContainersStatus.newInstance(); - } @Override public void serviceInit(Configuration conf) throws Exception { @@ -173,16 +152,20 @@ public class ContainerScheduler extends AbstractService implements YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION, YarnConfiguration. DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION); - // We assume over allocation configurations have been initialized - this.utilizationTracker = getResourceTracker(); } - private AllocationBasedResourceTracker getResourceTracker() { - if (context.isOverAllocationEnabled()) { - return new UtilizationBasedResourceTracker(this); - } else { - return new AllocationBasedResourceTracker(this); - } + @VisibleForTesting + public ContainerScheduler(Context context, AsyncDispatcher dispatcher, + NodeManagerMetrics metrics, int qLength) { + super(ContainerScheduler.class.getName()); + this.context = context; + this.dispatcher = dispatcher; + this.metrics = metrics; + this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; + this.utilizationTracker = + new AllocationBasedResourceUtilizationTracker(this); + this.opportunisticContainersStatus = + OpportunisticContainersStatus.newInstance(); } /** @@ -205,18 +188,14 @@ public class ContainerScheduler extends AbstractService implements if (event instanceof UpdateContainerSchedulerEvent) { onUpdateContainer((UpdateContainerSchedulerEvent) event); } else { - LOG.error("Unknown event type on UpdateContainer: " + event.getType()); + LOG.error("Unknown event type on UpdateCOntainer: " + event.getType()); } break; case SHED_QUEUED_CONTAINERS: shedQueuedOpportunisticContainers(); break; case RECOVERY_COMPLETED: - startPendingContainers(false); - break; - case SCHEDULE_CONTAINERS: - startPendingContainers(true); - break; + startPendingContainers(maxOppQueueLength <= 0); default: LOG.error("Unknown event arrived at ContainerScheduler: " + event.toString()); @@ -231,10 +210,10 @@ public class ContainerScheduler extends AbstractService implements ContainerId containerId = updateEvent.getContainer().getContainerId(); if (updateEvent.isResourceChange()) { if (runningContainers.containsKey(containerId)) { - this.utilizationTracker.containerReleased( + this.utilizationTracker.subtractContainerResource( new ContainerImpl(getConfig(), null, null, null, null, updateEvent.getOriginalToken(), context)); - this.utilizationTracker.containerLaunched( + this.utilizationTracker.addContainerResources( updateEvent.getContainer()); getContainersMonitor().handle( new ChangeMonitoringContainerResourceEvent(containerId, @@ -250,20 +229,17 @@ public class ContainerScheduler extends AbstractService implements if (queuedOpportunisticContainers.remove(containerId) != null) { queuedGuaranteedContainers.put(containerId, updateEvent.getContainer()); - Resources.addTo(guaranteedResourcesDemanded, - updateEvent.getContainer().getResource()); - startPendingContainers(true); + //Kill/pause opportunistic containers if any to make room for + // promotion request + reclaimOpportunisticContainerResources(updateEvent.getContainer()); } } else { // Demotion of queued container.. Should not happen too often // since you should not find too many queued guaranteed // containers if (queuedGuaranteedContainers.remove(containerId) != null) { - Resources.subtractFrom(guaranteedResourcesDemanded, - updateEvent.getContainer().getResource()); queuedOpportunisticContainers.put(containerId, updateEvent.getContainer()); - startPendingContainers(false); } } try { @@ -290,7 +266,6 @@ public class ContainerScheduler extends AbstractService implements || rcs == RecoveredContainerStatus.PAUSED) { if (execType == ExecutionType.GUARANTEED) { queuedGuaranteedContainers.put(container.getContainerId(), container); - Resources.addTo(guaranteedResourcesDemanded, container.getResource()); } else if (execType == ExecutionType.OPPORTUNISTIC) { queuedOpportunisticContainers .put(container.getContainerId(), container); @@ -301,7 +276,7 @@ public class ContainerScheduler extends AbstractService implements } } else if (rcs == RecoveredContainerStatus.LAUNCHED) { runningContainers.put(container.getContainerId(), container); - utilizationTracker.containerLaunched(container); + utilizationTracker.addContainerResources(container); } } @@ -361,107 +336,65 @@ public class ContainerScheduler extends AbstractService implements } private void onResourcesReclaimed(Container container) { - ContainerId containerId = container.getContainerId(); - - // This could be killed externally for eg. by the ContainerManager, - // in which case, the container might still be queued. - if (queuedOpportunisticContainers.remove(containerId) != null) { - return; - } + oppContainersToKill.remove(container.getContainerId()); // This could be killed externally for eg. by the ContainerManager, // in which case, the container might still be queued. - if (queuedGuaranteedContainers.remove(containerId) != null) { - Resources.addTo(guaranteedResourcesDemanded, container.getResource()); - return; - } - - if (oppContainersToKill.remove(containerId) != null) { - Resources.subtractFrom( - opportunisticResourcesToBeReleased, container.getResource()); + Container queued = + queuedOpportunisticContainers.remove(container.getContainerId()); + if (queued == null) { + queuedGuaranteedContainers.remove(container.getContainerId()); } // Requeue PAUSED containers if (container.getContainerState() == ContainerState.PAUSED) { if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.GUARANTEED) { - queuedGuaranteedContainers.put(containerId, container); - Resources.addTo(guaranteedResourcesDemanded, container.getResource()); + queuedGuaranteedContainers.put(container.getContainerId(), container); } else { - queuedOpportunisticContainers.put(containerId, container); + queuedOpportunisticContainers.put( + container.getContainerId(), container); } } // decrement only if it was a running container - Container completedContainer = runningContainers.remove(containerId); + Container completedContainer = runningContainers.remove(container + .getContainerId()); // only a running container releases resources upon completion boolean resourceReleased = completedContainer != null; if (resourceReleased) { - this.utilizationTracker.containerReleased(container); + this.utilizationTracker.subtractContainerResource(container); if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.metrics.completeOpportunisticContainer(container.getResource()); } - - // In case of over-allocation being turned on, we may need to reclaim - // more resources since the opportunistic containers that have been - // killed or paused may have not released as much resource as we need. - boolean reclaimOpportunisticResources = context.isOverAllocationEnabled(); - startPendingContainers(reclaimOpportunisticResources); + boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); + startPendingContainers(forceStartGuaranteedContainers); } } /** * Start pending containers in the queue. - * @param reclaimOpportunisticResources if set to true, resources allocated - * to running OPPORTUNISTIC containers will be reclaimed in - * cases where there are GUARANTEED containers being queued + * @param forceStartGuaranteedContaieners When this is true, start guaranteed + * container without looking at available resource */ - private void startPendingContainers(boolean reclaimOpportunisticResources) { - // When opportunistic container not allowed (which is determined by - // max-queue length of pending opportunistic containers <= 0), start - // guaranteed containers without looking at available resources and - // skip scanning the queue of opportunistic containers - if (maxOppQueueLength <= 0) { - forcefullyStartGuaranteedContainers(); - return; - } - - Resource available = utilizationTracker.getAvailableResources(); - - // Start guaranteed containers that are queued, if resources available. - boolean allGuaranteedContainersLaunched = - startGuaranteedContainers(available); - // Start opportunistic containers, if resources available, which is true - // if all guaranteed containers in queue have been launched. - if (allGuaranteedContainersLaunched) { - startOpportunisticContainers(available); - } else { - // If not all guaranteed containers in queue are launched, we may need - // to reclaim resources from opportunistic containers that are running. - if (reclaimOpportunisticResources) { - reclaimOpportunisticContainerResources(); - } + private void startPendingContainers(boolean forceStartGuaranteedContaieners) { + // Start guaranteed containers that are paused, if resources available. + boolean resourcesAvailable = startContainers( + queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners); + // Start opportunistic containers, if resources available. + if (resourcesAvailable) { + startContainers(queuedOpportunisticContainers.values(), false); } } - /** - * Try to launch as many GUARANTEED containers as possible. - * @param available the amount of resources available to launch containers - * @return true if all queued GUARANTEED containers are launched - * or there is no GUARANTEED containers to launch - */ - private boolean startGuaranteedContainers(Resource available) { - Iterator<Container> cIter = - queuedGuaranteedContainers.values().iterator(); + private boolean startContainers( + Collection<Container> containersToBeStarted, boolean force) { + Iterator<Container> cIter = containersToBeStarted.iterator(); boolean resourcesAvailable = true; while (cIter.hasNext() && resourcesAvailable) { Container container = cIter.next(); - if (isResourceAvailable(available, container)) { - startContainer(container); - Resources.subtractFrom(available, container.getResource()); + if (tryStartContainer(container, force)) { cIter.remove(); - Resources.subtractFrom( - guaranteedResourcesDemanded, container.getResource()); } else { resourcesAvailable = false; } @@ -469,49 +402,25 @@ public class ContainerScheduler extends AbstractService implements return resourcesAvailable; } - /** - * Launch all queued GUARANTEED containers without checking resource - * availability. This is an optimization in cases where OPPORTUNISTIC - * containers are not allowed on the node. - */ - private void forcefullyStartGuaranteedContainers() { - Iterator<Container> cIter = - queuedGuaranteedContainers.values().iterator(); - while (cIter.hasNext()) { - Container container = cIter.next(); + private boolean tryStartContainer(Container container, boolean force) { + boolean containerStarted = false; + // call startContainer without checking available resource when force==true + if (force || resourceAvailableToStartContainer( + container)) { startContainer(container); - cIter.remove(); - Resources.subtractFrom( - guaranteedResourcesDemanded, container.getResource()); + containerStarted = true; } + return containerStarted; } + /** - * Try to launch as many OPPORTUNISTIC containers as possible. - * @param available the amount of resources available to launch containers - * @return true if all OPPORTUNISTIC containers are launched - * or there is no OPPORTUNISTIC containers to launch + * Check if there is resource available to start a given container + * immediately. (This can be extended to include overallocated resources) + * @param container the container to start + * @return true if container can be launched directly */ - private boolean startOpportunisticContainers(Resource available) { - Iterator<Container> cIter = - queuedOpportunisticContainers.values().iterator(); - boolean resourcesAvailable = true; - while (cIter.hasNext() && resourcesAvailable) { - Container container = cIter.next(); - if (isResourceAvailable(available, container)) { - startContainer(container); - Resources.subtractFrom(available, container.getResource()); - cIter.remove(); - } else { - resourcesAvailable = false; - } - } - return resourcesAvailable; - } - - private static boolean isResourceAvailable( - Resource resource, Container container) { - Resource left = Resources.subtract(resource, container.getResource()); - return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0; + private boolean resourceAvailableToStartContainer(Container container) { + return this.utilizationTracker.hasResourcesAvailable(container); } private boolean enqueueContainer(Container container) { @@ -521,7 +430,6 @@ public class ContainerScheduler extends AbstractService implements boolean isQueued; if (isGuaranteedContainer) { queuedGuaranteedContainers.put(container.getContainerId(), container); - Resources.addTo(guaranteedResourcesDemanded, container.getResource()); isQueued = true; } else { if (queuedOpportunisticContainers.size() < maxOppQueueLength) { @@ -566,7 +474,18 @@ public class ContainerScheduler extends AbstractService implements // enough number of opportunistic containers. if (isGuaranteedContainer) { enqueueContainer(container); - startPendingContainers(true); + + // When opportunistic container not allowed (which is determined by + // max-queue length of pending opportunistic containers <= 0), start + // guaranteed containers without looking at available resources. + boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); + startPendingContainers(forceStartGuaranteedContainers); + + // if the guaranteed container is queued, we need to preempt opportunistic + // containers for make room for it + if (queuedGuaranteedContainers.containsKey(container.getContainerId())) { + reclaimOpportunisticContainerResources(container); + } } else { // Given an opportunistic container, we first try to start as many queuing // guaranteed containers as possible followed by queuing opportunistic @@ -584,19 +503,19 @@ public class ContainerScheduler extends AbstractService implements } @SuppressWarnings("unchecked") - private void reclaimOpportunisticContainerResources() { + private void reclaimOpportunisticContainerResources(Container container) { List<Container> extraOppContainersToReclaim = - pickOpportunisticContainersToReclaimResources(); - killOpportunisticContainers(extraOppContainersToReclaim); - } - - private void killOpportunisticContainers( - Collection<Container> containersToReclaim) { - for (Container contToReclaim : containersToReclaim) { + pickOpportunisticContainersToReclaimResources( + container.getContainerId()); + // Kill the opportunistic containers that were chosen. + for (Container contToReclaim : extraOppContainersToReclaim) { String preemptionAction = usePauseEventForPreemption == true ? "paused" : - "preempted"; - LOG.info("Container {} will be {} to start the execution of guaranteed" + - " containers.", contToReclaim.getContainerId(), preemptionAction); + "resumed"; + LOG.info( + "Container {} will be {} to start the " + + "execution of guaranteed container {}.", + contToReclaim.getContainerId(), preemptionAction, + container.getContainerId()); if (usePauseEventForPreemption) { contToReclaim.sendPauseEvent( @@ -607,8 +526,6 @@ public class ContainerScheduler extends AbstractService implements "Container Killed to make room for Guaranteed Container."); } oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim); - Resources.addTo( - opportunisticResourcesToBeReleased, contToReclaim.getResource()); } } @@ -617,7 +534,7 @@ public class ContainerScheduler extends AbstractService implements // Skip to put into runningContainers and addUtilization when recover if (!runningContainers.containsKey(container.getContainerId())) { runningContainers.put(container.getContainerId(), container); - this.utilizationTracker.containerLaunched(container); + this.utilizationTracker.addContainerResources(container); } if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { @@ -626,12 +543,14 @@ public class ContainerScheduler extends AbstractService implements container.sendLaunchEvent(); } - private List<Container> pickOpportunisticContainersToReclaimResources() { + private List<Container> pickOpportunisticContainersToReclaimResources( + ContainerId containerToStartId) { // The opportunistic containers that need to be killed for the // given container to start. List<Container> extraOpportContainersToKill = new ArrayList<>(); // Track resources that need to be freed. - ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( + containerToStartId); // Go over the running opportunistic containers. // Use a descending iterator to kill more recently started containers. @@ -650,19 +569,15 @@ public class ContainerScheduler extends AbstractService implements continue; } extraOpportContainersToKill.add(runningCont); - // In the case of over-allocation, the running container may not - // release as much resources as it has requested, but we'll check - // again if more containers need to be killed/paused when this - // container is released. ContainersMonitor.decreaseResourceUtilization( getContainersMonitor(), resourcesToFreeUp, runningCont.getResource()); } } if (!hasSufficientResources(resourcesToFreeUp)) { - LOG.warn("There are no sufficient resources to start guaranteed" + - " containers at the moment. Opportunistic containers are in" + - " the process of being killed to make room."); + LOG.warn("There are no sufficient resources to start guaranteed [{}]" + + "at the moment. Opportunistic containers are in the process of" + + "being killed to make room.", containerToStartId); } return extraOpportContainersToKill; } @@ -677,42 +592,34 @@ public class ContainerScheduler extends AbstractService implements * getContainersMonitor().getVCoresAllocatedForContainers()) <= 0; } - /** - * Determine how much resources are needed to be freed up to launch the given - * GUARANTEED container. Used to determine how many running OPPORTUNISTIC - * containers need to be killed/paused, assuming OPPORTUNISTIC containers to - * be killed/paused will release the amount of resources they have requested. - * - * If the node is over-allocating itself, this may cause not enough - * OPPORTUNISTIC containers being killed/paused in cases where the running - * OPPORTUNISTIC containers are not consuming fully their resource requests. - * We'd check again upon container completion events to see if more running - * OPPORTUNISTIC containers need to be killed/paused. - * - * @return the amount of resource needed to be reclaimed for this container - */ - private ResourceUtilization resourcesToFreeUp() { + private ResourceUtilization resourcesToFreeUp( + ContainerId containerToStartId) { // Get allocation of currently allocated containers. ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization - .newInstance(0, 0, 0.0f); - - // Add to the allocation the allocation of pending guaranteed containers. - ContainersMonitor.increaseResourceUtilization(getContainersMonitor(), - resourceAllocationToFreeUp, guaranteedResourcesDemanded); + .newInstance(this.utilizationTracker.getCurrentUtilization()); + + // Add to the allocation the allocation of the pending guaranteed + // containers that will start before the current container will be started. + for (Container container : queuedGuaranteedContainers.values()) { + ContainersMonitor.increaseResourceUtilization( + getContainersMonitor(), resourceAllocationToFreeUp, + container.getResource()); + if (container.getContainerId().equals(containerToStartId)) { + break; + } + } // These resources are being freed, likely at the behest of another // guaranteed container.. - ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(), - resourceAllocationToFreeUp, opportunisticResourcesToBeReleased); - - // Deduct any remaining resources available - Resource availableResources = utilizationTracker.getAvailableResources(); - if (availableResources.getVirtualCores() > 0 && - availableResources.getMemorySize() > 0) { - ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(), - resourceAllocationToFreeUp, availableResources); + for (Container container : oppContainersToKill.values()) { + ContainersMonitor.decreaseResourceUtilization( + getContainersMonitor(), resourceAllocationToFreeUp, + container.getResource()); } + // Subtract the overall node resources. + getContainersMonitor().subtractNodeResourcesFromResourceUtilization( + resourceAllocationToFreeUp); return resourceAllocationToFreeUp; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 9ad4f91..294eddf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -28,7 +28,5 @@ public enum ContainerSchedulerEventType { // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, CONTAINER_PAUSED, - RECOVERY_COMPLETED, - // Producer: Containers Monitor when over-allocation is on - SCHEDULE_CONTAINERS + RECOVERY_COMPLETED } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java deleted file mode 100644 index 58b73d2..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; - -/** - * Keeps track of containers utilization over time and determines how much - * resources are available to launch containers when over-allocation is on. - */ -public abstract class NMAllocationPolicy { - protected final ResourceThresholds overAllocationThresholds; - protected final ContainersMonitor containersMonitor; - - public NMAllocationPolicy( - ResourceThresholds overAllocationThresholds, - ContainersMonitor containersMonitor) { - this.containersMonitor = containersMonitor; - this.overAllocationThresholds = overAllocationThresholds; - } - - /** - * Handle container launch events. - * @param container the container that has been launched - */ - public void containerLaunched(Container container) { - - } - - /** - * Handle container release events. - * @param container the container that has been released - */ - public void containerReleased(Container container) { - - } - - /** - * Get the amount of resources to launch containers when - * over-allocation is turned on. - * @return the amount of resources available to launch containers - */ - public abstract Resource getAvailableResources(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 98d99c6..3c17eca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -39,20 +38,22 @@ public interface ResourceUtilizationTracker { ResourceUtilization getCurrentUtilization(); /** - * Get the amount of resources currently available to launch containers. - * @return Resource resources available to launch containers + * Add Container's resources to Node Utilization. + * @param container Container. */ - Resource getAvailableResources(); + void addContainerResources(Container container); /** - * Add Container's resources to Node Utilization upon container launch. + * Subtract Container's resources to Node Utilization. * @param container Container. */ - void containerLaunched(Container container); + void subtractContainerResource(Container container); /** - * Subtract Container's resources to Node Utilization upon container release. + * Check if NM has resources available currently to run the container. * @param container Container. + * @return True, if NM has resources available currently to run the container. */ - void containerReleased(Container container); + boolean hasResourcesAvailable(Container container); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java deleted file mode 100644 index f486506..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceUtilization; -import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; - -/** - * An implementation of NMAllocationPolicy based on the - * snapshot of the latest containers utilization to determine how much - * resources are available * to launch containers when over-allocation - * is turned on. - */ -public class SnapshotBasedOverAllocationPolicy - extends NMAllocationPolicy { - - public SnapshotBasedOverAllocationPolicy( - ResourceThresholds overAllocationThresholds, - ContainersMonitor containersMonitor) { - super(overAllocationThresholds, containersMonitor); - } - - @Override - public Resource getAvailableResources() { - ResourceUtilization utilization = - containersMonitor.getContainersUtilization(true).getUtilization(); - long memoryAvailable = Math.round( - overAllocationThresholds.getMemoryThreshold() * - containersMonitor.getPmemAllocatedForContainers()) - - (utilization.getPhysicalMemory() << 20); - int vcoreAvailable = Math.round( - (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) * - containersMonitor.getVCoresAllocatedForContainers()); - return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java deleted file mode 100644 index 6f9bc82..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceUtilization; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** -* An resource availability tracker that determines if there are resources -* available based on if there are unallocated resources or if there are -* un-utilized resources. -*/ -public class UtilizationBasedResourceTracker - extends AllocationBasedResourceTracker { - private static final Logger LOG = - LoggerFactory.getLogger(AllocationBasedResourceTracker.class); - - private final NMAllocationPolicy overAllocationPolicy; - - UtilizationBasedResourceTracker(ContainerScheduler scheduler) { - super(scheduler); - this.overAllocationPolicy = - getContainersMonitor().getContainerOverAllocationPolicy(); - } - - @Override - public void containerLaunched(Container container) { - super.containerLaunched(container); - if (overAllocationPolicy != null) { - overAllocationPolicy.containerLaunched(container); - } - } - - @Override - public void containerReleased(Container container) { - super.containerReleased(container); - if (overAllocationPolicy != null) { - overAllocationPolicy.containerReleased(container); - } - } - - @Override - public Resource getAvailableResources() { - Resource resourceBasedOnAllocation = getUnallocatedResources(); - Resource resourceBasedOnUtilization = - getResourcesAvailableBasedOnUtilization(); - if (LOG.isDebugEnabled()) { - LOG.debug("The amount of resources available based on allocation is " + - resourceBasedOnAllocation + ", based on utilization is " + - resourceBasedOnUtilization); - } - - return Resources.componentwiseMax(resourceBasedOnAllocation, - resourceBasedOnUtilization); - } - - /** - * Get the amount of resources based on the slack between - * the actual utilization and desired utilization. - * @return Resource resource available - */ - private Resource getResourcesAvailableBasedOnUtilization() { - if (overAllocationPolicy == null) { - return Resources.none(); - } - - return overAllocationPolicy.getAvailableResources(); - } - - @Override - public ResourceUtilization getCurrentUtilization() { - return getContainersMonitor().getContainersUtilization(false) - .getUtilization(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 05e9dd0..93d0afb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,7 +26,6 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; -import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -346,40 +345,6 @@ public abstract class BaseContainerManagerTest { fStates.contains(containerStatus.getState())); } - public static void waitForContainerSubState( - ContainerManagementProtocol containerManager, ContainerId containerID, - ContainerSubState finalState) - throws InterruptedException, YarnException, IOException { - waitForContainerSubState(containerManager, containerID, - Arrays.asList(finalState), 20); - } - public static void waitForContainerSubState( - ContainerManagementProtocol containerManager, ContainerId containerID, - List<ContainerSubState> finalStates, int timeOutMax) - throws InterruptedException, YarnException, IOException { - List<ContainerId> list = new ArrayList<>(); - list.add(containerID); - GetContainerStatusesRequest request = - GetContainerStatusesRequest.newInstance(list); - ContainerStatus containerStatus; - HashSet<ContainerSubState> fStates = new HashSet<>(finalStates); - int timeoutSecs = 0; - do { - Thread.sleep(1000); - containerStatus = - containerManager.getContainerStatuses(request) - .getContainerStatuses().get(0); - LOG.info("Waiting for container to get into one of states " + fStates - + ". Current state is " + containerStatus.getContainerSubState()); - timeoutSecs += 1; - } while (!fStates.contains(containerStatus.getContainerSubState()) - && timeoutSecs < timeOutMax); - LOG.info("Container state is " + containerStatus.getContainerSubState()); - Assert.assertTrue("ContainerSubState is not correct (timedout)", - fStates.contains(containerStatus.getContainerSubState())); - } - - public static void waitForApplicationState( ContainerManagerImpl containerManager, ApplicationId appID, ApplicationState finalState) http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java index c071283..8aee532 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -288,7 +288,7 @@ public class TestContainersMonitorResourceChange { // will be 0. assertEquals( "Resource utilization must be default with MonitorThread's first run", - 0, containersMonitor.getContainersUtilization(false).getUtilization() + 0, containersMonitor.getContainersUtilization() .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))); // Verify the container utilization value. Since atleast one round is done, @@ -303,9 +303,8 @@ public class TestContainersMonitorResourceChange { ContainersMonitorImpl containersMonitor, int timeoutMsecs) throws InterruptedException { int timeWaiting = 0; - while (0 == containersMonitor.getContainersUtilization(false) - .getUtilization().compareTo( - ResourceUtilization.newInstance(0, 0, 0.0f))) { + while (0 == containersMonitor.getContainersUtilization() + .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))) { if (timeWaiting >= timeoutMsecs) { break; } @@ -317,7 +316,7 @@ public class TestContainersMonitorResourceChange { } assertTrue("Resource utilization is not changed from second run onwards", - 0 != containersMonitor.getContainersUtilization(false).getUtilization() + 0 != containersMonitor.getContainersUtilization() .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java deleted file mode 100644 index 1e8bfdf..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Tests for the {@link AllocationBasedResourceTracker} class. - */ -public class TestAllocationBasedResourceTracker { - - private ContainerScheduler mockContainerScheduler; - - @Before - public void setup() { - mockContainerScheduler = mock(ContainerScheduler.class); - ContainersMonitor containersMonitor = - new ContainersMonitorImpl(mock(ContainerExecutor.class), - mock(AsyncDispatcher.class), mock(Context.class)); - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024); - conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); - conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f); - conf.setInt(YarnConfiguration.NM_VCORES, 8); - containersMonitor.init(conf); - when(mockContainerScheduler.getContainersMonitor()) - .thenReturn(containersMonitor); - } - - /** - * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the - * hasResourceAvailable should return false. - */ - @Test - public void testHasResourcesAvailable() { - AllocationBasedResourceTracker tracker = - new AllocationBasedResourceTracker(mockContainerScheduler); - Container testContainer = mock(Container.class); - when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4)); - for (int i = 0; i < 2; i++) { - Assert.assertTrue( - isResourcesAvailable(tracker.getAvailableResources(), testContainer)); - tracker.containerLaunched(testContainer); - } - Assert.assertFalse( - isResourcesAvailable(tracker.getAvailableResources(), testContainer)); - } - - private static boolean isResourcesAvailable( - Resource available, Container container) { - return available.compareTo(container.getResource()) >= 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1235cf5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java new file mode 100644 index 0000000..82c2147 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for the {@link AllocationBasedResourceUtilizationTracker} class. + */ +public class TestAllocationBasedResourceUtilizationTracker { + + private ContainerScheduler mockContainerScheduler; + + @Before + public void setup() { + mockContainerScheduler = mock(ContainerScheduler.class); + ContainersMonitor containersMonitor = + new ContainersMonitorImpl(mock(ContainerExecutor.class), + mock(AsyncDispatcher.class), mock(Context.class)); + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024); + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); + conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f); + conf.setInt(YarnConfiguration.NM_VCORES, 8); + containersMonitor.init(conf); + when(mockContainerScheduler.getContainersMonitor()) + .thenReturn(containersMonitor); + } + + /** + * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the + * hasResourceAvailable should return false. + */ + @Test + public void testHasResourcesAvailable() { + AllocationBasedResourceUtilizationTracker tracker = + new AllocationBasedResourceUtilizationTracker(mockContainerScheduler); + Container testContainer = mock(Container.class); + when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4)); + for (int i = 0; i < 2; i++) { + Assert.assertTrue(tracker.hasResourcesAvailable(testContainer)); + tracker.addContainerResources(testContainer); + } + Assert.assertFalse(tracker.hasResourcesAvailable(testContainer)); + } + + /** + * Test the case where the current allocation has been truncated to 0.8888891 + * (8/9 cores used). Request 1 additional core - hasEnoughCpu should return + * true. + */ + @Test + public void testHasEnoughCpu() { + AllocationBasedResourceUtilizationTracker tracker = + new AllocationBasedResourceUtilizationTracker(mockContainerScheduler); + float currentAllocation = 0.8888891f; + long totalCores = 9; + int alreadyUsedCores = 8; + Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores, + (int) totalCores - alreadyUsedCores)); + Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores, + (int) totalCores - alreadyUsedCores + 1)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org