YARN-6672. Add NM preemption of opportunistic containers when utilization goes high.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5f99f75 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5f99f75 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5f99f75 Branch: refs/heads/YARN-1011 Commit: b5f99f752be5f36e9cdfe490e1b68dc1ca745bed Parents: e0e6460 Author: Haibo Chen <haiboc...@apache.org> Authored: Mon Jul 2 15:37:47 2018 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Mon Jul 2 15:37:47 2018 -0700 ---------------------------------------------------------------------- .../monitor/ContainersMonitorImpl.java | 85 ++- .../AllocationBasedResourceTracker.java | 5 + .../scheduler/ContainerScheduler.java | 59 +- .../scheduler/ContainerSchedulerEventType.java | 4 +- ...rSchedulerOverallocationPreemptionEvent.java | 45 ++ .../scheduler/NMAllocationPreemptionPolicy.java | 54 ++ .../scheduler/ResourceUtilizationTracker.java | 7 + ...shotBasedOverAllocationPreemptionPolicy.java | 81 +++ .../BaseContainerManagerTest.java | 126 ++++ .../TestContainerSchedulerQueuing.java | 167 +++-- ...estContainerSchedulerWithOverAllocation.java | 650 +++++++++++-------- ...shotBasedOverAllocationPreemptionPolicy.java | 259 ++++++++ 12 files changed, 1229 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 068056d..e180a60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -20,14 +20,19 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerOverallocationPreemptionEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPreemptionPolicy; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPreemptionPolicy; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +124,7 @@ public class ContainersMonitorImpl extends AbstractService implements private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; private int overAlloctionPreemptionCpuCount = -1; + private NMAllocationPreemptionPolicy overAllocationPreemptionPolicy; private volatile boolean stopped = false; @@ -373,6 +379,9 @@ public class ContainersMonitorImpl extends AbstractService implements this.overAllocationPolicy = createOverAllocationPolicy(resourceThresholds); + this.overAllocationPreemptionPolicy = createOverAllocationPreemptionPolicy( + overAllocationPreemptionThresholds, overAlloctionPreemptionCpuCount); + LOG.info("NodeManager oversubscription enabled with overallocation " + "thresholds (memory:" + overAllocationMemoryUtilizationThreshold + ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" + @@ -385,6 +394,12 @@ public class ContainersMonitorImpl extends AbstractService implements return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this); } + private NMAllocationPreemptionPolicy createOverAllocationPreemptionPolicy( + ResourceThresholds resourceThresholds, int maxTimesCpuOverLimit) { + return new SnapshotBasedOverAllocationPreemptionPolicy( + resourceThresholds, maxTimesCpuOverLimit, this); + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this @@ -671,9 +686,7 @@ public class ContainersMonitorImpl extends AbstractService implements setLatestContainersUtilization(trackedContainersUtilization); // check opportunity to start containers if over-allocation is on - if (context.isOverAllocationEnabled()) { - attemptToStartContainersUponLowUtilization(); - } + checkUtilization(); // Publish the container utilization metrics to node manager // metrics system. @@ -1054,13 +1067,46 @@ public class ContainersMonitorImpl extends AbstractService implements return overAllocationPolicy; } + public NMAllocationPreemptionPolicy getOverAllocationPreemptionPolicy() { + return overAllocationPreemptionPolicy; + } + private void setLatestContainersUtilization(ResourceUtilization utilization) { this.latestContainersUtilization = new ContainersResourceUtilization( - utilization, System.currentTimeMillis()); + utilization, Time.now()); } + /** + * Check the resource utilization of the node. If the utilization is below + * the over-allocation threshold, {@link ContainerScheduler} is notified to + * launch OPPORTUNISTIC containers that are being queued to bring the + * utilization up to the over-allocation threshold. If the utilization + * is above the preemption threshold, {@link ContainerScheduler} is notified + * to preempt running OPPORTUNISTIC containers in order to bring the node + * utilization down to the preemption threshold. + * @return true if the utilization is below the over-allocation threshold or + * above the preemption threshold + * false otherwise + */ @VisibleForTesting - public void attemptToStartContainersUponLowUtilization() { + public boolean checkUtilization() { + if (context.isOverAllocationEnabled()) { + return checkLowUtilization() || checkHighUtilization(); + } + return false; + } + + /** + * Check if the node resource utilization is below the over-allocation + * threshold. If so, a {@link ContainerSchedulerEvent} is + * generated so that OPPORTUNISTIC containers that are being queued can + * be launched by {@link ContainerScheduler} with over-allocation and + * the node utilization can be brought up to the over-allocation threshold + * @return true if the node utilization is below the over-allocation threshold + * false otherwise + */ + private boolean checkLowUtilization() { + boolean opportunisticContainersToStart = false; if (getContainerOverAllocationPolicy() != null) { Resource available = getContainerOverAllocationPolicy() .getAvailableResources(); @@ -1069,8 +1115,37 @@ public class ContainersMonitorImpl extends AbstractService implements eventDispatcher.getEventHandler().handle( new ContainerSchedulerEvent(null, ContainerSchedulerEventType.SCHEDULE_CONTAINERS)); + opportunisticContainersToStart = true; + LOG.info("Node utilization is below its over-allocation threshold. " + + "Inform container scheduler to launch opportunistic containers."); } } + return opportunisticContainersToStart; + } + + /** + * Check if the node resource utilization is over the preemption threshold. + * If so, a {@link ContainerSchedulerOverallocationPreemptionEvent} is + * generated so that OPPORTUNISTIC containers can be preempted by + * {@link ContainerScheduler} to reclaim resources in order to bring the + * node utilization down to the preemption threshold. + * @return true if the node utilization is over the preemption threshold + * false otherwise + */ + private boolean checkHighUtilization() { + ResourceUtilization overLimit = getOverAllocationPreemptionPolicy() + .getResourcesToReclaim(); + + boolean opportunisticContainersToPreempt = false; + + if (overLimit.getPhysicalMemory() > 0 || overLimit.getCPU() > 0) { + opportunisticContainersToPreempt = true; + eventDispatcher.getEventHandler().handle( + new ContainerSchedulerOverallocationPreemptionEvent(overLimit)); + LOG.info("Node utilization is over the preemption threshold. " + + "Inform container scheduler to reclaim {}", overLimit); + } + return opportunisticContainersToPreempt; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java index a3e19eb..7840fef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java @@ -53,6 +53,11 @@ public class AllocationBasedResourceTracker */ @Override public ResourceUtilization getCurrentUtilization() { + return getTotalAllocation(); + } + + @Override + public ResourceUtilization getTotalAllocation() { return this.containersAllocation; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 304c2b8..947c096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -219,12 +219,60 @@ public class ContainerScheduler extends AbstractService implements // node utilization is low. startOpportunisticContainers(utilizationTracker.getAvailableResources()); break; + case PREEMPT_CONTAINERS: + if (event instanceof ContainerSchedulerOverallocationPreemptionEvent) { + preemptOpportunisticContainers( + (ContainerSchedulerOverallocationPreemptionEvent) event); + } else { + LOG.error( + "Unknown event type on Preempt containers: {}", event.getType()); + } + break; default: - LOG.error("Unknown event arrived at ContainerScheduler: " - + event.toString()); + LOG.error("Unknown event arrived at ContainerScheduler: {}", event); } } + private void preemptOpportunisticContainers( + ContainerSchedulerOverallocationPreemptionEvent event) { + ResourceUtilization resourcesToReclaim = + getResourcesToReclaim(event.getResourcesOverPreemptionThresholds()); + + List<Container> oppContainersToReclaim = + pickOpportunisticContainersToReclaimResources( + resourcesToReclaim); + + killOpportunisticContainers(oppContainersToReclaim); + } + + /** + * Get the amount of resources that need to be reclaimed by preempting + * OPPORTUNISTIC containers considering the amount of resources that + * are over the preemption thresholds and over the capacity of the node. + * When the node is not being over-allocated, its resource utilization + * can safely go to 100% without any OPPORTUNISTIC containers being killed. + */ + private ResourceUtilization getResourcesToReclaim( + ResourceUtilization resourcesOverPreemptionThresholds) { + ResourceUtilization totalAllocation = ResourceUtilization.newInstance( + utilizationTracker.getTotalAllocation()); + getContainersMonitor().subtractNodeResourcesFromResourceUtilization( + totalAllocation); + ResourceUtilization overAllocatedResources = + ResourceUtilization.newInstance( + Math.max(0, totalAllocation.getPhysicalMemory()), + Math.max(0, totalAllocation.getVirtualMemory()), + Math.max(0, totalAllocation.getCPU())); + + return ResourceUtilization.newInstance( + Math.min(overAllocatedResources.getPhysicalMemory(), + resourcesOverPreemptionThresholds.getPhysicalMemory()), + Math.min(overAllocatedResources.getVirtualMemory(), + resourcesOverPreemptionThresholds.getVirtualMemory()), + Math.min(overAllocatedResources.getCPU(), + resourcesOverPreemptionThresholds.getCPU())); + } + /** * We assume that the ContainerManager has already figured out what kind * of update this is. @@ -593,8 +641,9 @@ public class ContainerScheduler extends AbstractService implements @SuppressWarnings("unchecked") private void reclaimOpportunisticContainerResources() { + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); List<Container> extraOppContainersToReclaim = - pickOpportunisticContainersToReclaimResources(); + pickOpportunisticContainersToReclaimResources(resourcesToFreeUp); killOpportunisticContainers(extraOppContainersToReclaim); } @@ -634,12 +683,12 @@ public class ContainerScheduler extends AbstractService implements container.sendLaunchEvent(); } - private List<Container> pickOpportunisticContainersToReclaimResources() { + private List<Container> pickOpportunisticContainersToReclaimResources( + ResourceUtilization resourcesToFreeUp) { // The opportunistic containers that need to be killed for the // given container to start. List<Container> extraOpportContainersToKill = new ArrayList<>(); // Track resources that need to be freed. - ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); // Go over the running opportunistic containers. // Use a descending iterator to kill more recently started containers. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 9ad4f91..f76727d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -30,5 +30,7 @@ public enum ContainerSchedulerEventType { CONTAINER_PAUSED, RECOVERY_COMPLETED, // Producer: Containers Monitor when over-allocation is on - SCHEDULE_CONTAINERS + SCHEDULE_CONTAINERS, + // Producer: Containers Monitor when over-allocation is on + PREEMPT_CONTAINERS } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java new file mode 100644 index 0000000..547036a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; + +/** + * A {@link ContainerSchedulerEvent} generated by {@link ContainersMonitorImpl} + * when overallocation is turned on and the utilization goes over the + * proactive preemption thresholds. + */ +public class ContainerSchedulerOverallocationPreemptionEvent + extends ContainerSchedulerEvent { + private final ResourceUtilization resourcesOverPreemptionThresholds; + /** + * Create instance of Event. + * + * @param toFree resource to free up. + */ + public ContainerSchedulerOverallocationPreemptionEvent( + ResourceUtilization toFree) { + super(null, ContainerSchedulerEventType.PREEMPT_CONTAINERS); + this.resourcesOverPreemptionThresholds = toFree; + } + + public ResourceUtilization getResourcesOverPreemptionThresholds() { + return resourcesOverPreemptionThresholds; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java new file mode 100644 index 0000000..61f6298 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * Keeps track of containers utilization over time and determines how many + * resources need to be reclaimed by preempting opportunistic containers + * when over-allocation is turned on. + */ +public abstract class NMAllocationPreemptionPolicy { + private final ResourceThresholds overAllocationPreemptionThresholds; + private final ContainersMonitor containersMonitor; + + public NMAllocationPreemptionPolicy( + ResourceThresholds preemptionThresholds, + ContainersMonitor containersMonitor) { + this.containersMonitor = containersMonitor; + this.overAllocationPreemptionThresholds = preemptionThresholds; + } + + /** + * Get the amount of resources to reclaim by preempting opportunistic + * containers when over-allocation is turned on. + * @return the amount of resources to be reclaimed + */ + public abstract ResourceUtilization getResourcesToReclaim(); + + public ContainersMonitor getContainersMonitor() { + return containersMonitor; + } + + public ResourceThresholds getOverAllocationPreemptionThresholds() { + return overAllocationPreemptionThresholds; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 7a7c78e..9be7574 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -39,6 +39,13 @@ public interface ResourceUtilizationTracker { ResourceUtilization getCurrentUtilization(); /** + * Get the total amount of resources allocated to running containers + * in terms of resource utilization. + * @return ResourceUtilization resource allocation + */ + ResourceUtilization getTotalAllocation(); + + /** * Get the amount of resources currently available to launch containers. * @return Resource resources available to launch containers */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java new file mode 100644 index 0000000..188a108 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * An implementation of {@link NMAllocationPreemptionPolicy} based on the + * snapshot of the latest containers utilization to determine how many + * resources need to be reclaimed by preempting opportunistic containers + * when over-allocation is turned on. + */ +public class SnapshotBasedOverAllocationPreemptionPolicy + extends NMAllocationPreemptionPolicy { + private final int absoluteMemoryPreemptionThresholdMb; + private final float cpuPreemptionThreshold; + private final int maxTimesCpuOverPreemption; + private int timesCpuOverPreemption; + + public SnapshotBasedOverAllocationPreemptionPolicy( + ResourceThresholds preemptionThresholds, + int timesCpuOverPreemptionThreshold, + ContainersMonitor containersMonitor) { + super(preemptionThresholds, containersMonitor); + int memoryCapacityMb = (int) + (containersMonitor.getPmemAllocatedForContainers() / (1024 * 1024)); + this.absoluteMemoryPreemptionThresholdMb = (int) + (preemptionThresholds.getMemoryThreshold() * memoryCapacityMb); + this.cpuPreemptionThreshold = preemptionThresholds.getCpuThreshold(); + this.maxTimesCpuOverPreemption = timesCpuOverPreemptionThreshold; + } + + @Override + public ResourceUtilization getResourcesToReclaim() { + ResourceUtilization utilization = + getContainersMonitor().getContainersUtilization(true).getUtilization(); + + int memoryOverLimit = utilization.getPhysicalMemory() - + absoluteMemoryPreemptionThresholdMb; + float vcoreOverLimit = utilization.getCPU() - cpuPreemptionThreshold; + + if (vcoreOverLimit > 0) { + timesCpuOverPreemption++; + if (timesCpuOverPreemption > maxTimesCpuOverPreemption) { + timesCpuOverPreemption = 0; + } else { + // report no over limit for cpu if # of times CPU is over the preemption + // threshold is not greater the max number of times allowed + vcoreOverLimit = 0; + } + } else { + // reset the counter when cpu utilization goes under the preemption + // threshold before the max times allowed is reached + timesCpuOverPreemption = 0; + } + + // sanitize so that zero is returned if the utilization is below + // the preemption threshold + vcoreOverLimit = Math.max(0, vcoreOverLimit); + memoryOverLimit = Math.max(0, memoryOverLimit); + + return ResourceUtilization.newInstance(memoryOverLimit, 0, vcoreOverLimit); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 05e9dd0..4dedf96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,7 +26,12 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ContainerSubState; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -534,4 +539,125 @@ public abstract class BaseContainerManagerTest { ContainerId.newContainerId(appAttemptId, cId); return containerId; } + + /** + * A test implementation of {@link ContainersMonitor} that allows control of + * the current resource utilization. + */ + protected static class ContainerMonitorForTest + extends ContainersMonitorImpl { + private static final int NM_CONTAINERS_VCORES = 4; + private static final int NM_CONTAINERS_MEMORY_MB = 2048; + + private ResourceUtilization containerResourceUsage = + ResourceUtilization.newInstance(0, 0, 0.0f); + + ContainerMonitorForTest(ContainerExecutor exec, + AsyncDispatcher dispatcher, Context context) { + super(exec, dispatcher, context); + } + + @Override + public long getPmemAllocatedForContainers() { + return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L; + } + + @Override + public long getVmemAllocatedForContainers() { + float pmemRatio = getConfig().getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + return (long) (pmemRatio * getPmemAllocatedForContainers()); + } + + @Override + public long getVCoresAllocatedForContainers() { + return NM_CONTAINERS_VCORES; + } + + @Override + public ContainersResourceUtilization getContainersUtilization( + boolean latest) { + return new ContainersMonitor.ContainersResourceUtilization( + containerResourceUsage, Time.now()); + } + + @Override + protected void checkOverAllocationPrerequisites() { + // do not check + } + + public void setContainerResourceUsage( + ResourceUtilization containerResourceUsage) { + this.containerResourceUsage = containerResourceUsage; + } + } + + /** + * A test implementation of {@link ContainerManager} that allows its + * internal event queue to be drained for synchronization purpose, + * and an out-of-band check of the node resource utilization so that + * the node utilization can stay between the over-allocation threshold + * and the preemption threshold. + */ + protected static class ContainerManagerForTest + extends ContainerManagerImpl { + + private final String user; + + public ContainerManagerForTest( + Context context, ContainerExecutor exec, + DeletionService deletionContext, + NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler, String user) { + super(context, exec, deletionContext, + nodeStatusUpdater, metrics, dirsHandler); + this.user = user; + } + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + @Override + protected AsyncDispatcher createDispatcher() { + return new DrainDispatcher(); + } + + @Override + protected ContainersMonitor createContainersMonitor( + ContainerExecutor exec) { + return new ContainerMonitorForTest(exec, dispatcher, context); + } + + /** + * Check the node resource utilization out-of-band. If the utilization is + * below the over-allocation threshold, queued OPPORTUNISTIC containers + * will be launched to bring the node utilization up to the over-allocation + * threshold. If the utilization is above the preemption threshold, running + * OPPORTUNISTIC containers will be killed to bring the utilization down to + * the preemption threshold. + */ + public void checkNodeResourceUtilization() { + ((ContainerMonitorForTest) getContainersMonitor()).checkUtilization(); + drainAsyncEvents(); + } + + /** + * Drain the internal event queue. + */ + public void drainAsyncEvents() { + ((DrainDispatcher) dispatcher).await(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f99f75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 70066c6..d5c09b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentMap; import com.google.common.base.Supplier; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; @@ -38,18 +37,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ConfigurationException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; @@ -65,8 +61,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -131,47 +125,8 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { @Override protected ContainerManagerImpl createContainerManager( DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, - nodeStatusUpdater, metrics, dirsHandler) { - - @Override - protected UserGroupInformation getRemoteUgi() throws YarnException { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context - .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() - .getKeyId())); - return ugi; - } - - @Override - protected ContainersMonitor createContainersMonitor( - ContainerExecutor exec) { - return new ContainersMonitorImpl(exec, dispatcher, this.context) { - // Define resources available for containers to be executed. - @Override - public long getPmemAllocatedForContainers() { - return 2048 * 1024 * 1024L; - } - - @Override - public long getVmemAllocatedForContainers() { - float pmemRatio = getConfig().getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - return (long) (pmemRatio * getPmemAllocatedForContainers()); - } - - @Override - public long getVCoresAllocatedForContainers() { - return 4; - } - }; - } - }; + return new ContainerManagerForTest(context, exec, delSrvc, + nodeStatusUpdater, metrics, dirsHandler, user); } @Override @@ -393,6 +348,37 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { containerScheduler.getNumQueuedGuaranteedContainers()); Assert.assertEquals(2, containerScheduler.getNumQueuedOpportunisticContainers()); + + // we have just one container that requested 2048 MB of memory and 1 vcore + // running, its resource utilization is zero. + // check the node resource utilization and the two OPPORTUNISTIC containers + // that are being queued should stay in the queue because over-allocation + // is turn off. + ((ContainerMonitorForTest) containerManager.getContainersMonitor()) + .setContainerResourceUsage( + ResourceUtilization.newInstance(0, 0, 0.0f)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + ContainerId containerId0 = createContainerId(0); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(containerId0)) { + Assert.assertEquals(ContainerSubState.RUNNING, + status.getContainerSubState()); + } else { + Assert.assertEquals(ContainerSubState.SCHEDULED, + status.getContainerSubState()); + } + } + containerScheduler = containerManager.getContainerScheduler(); + // Ensure two containers are properly queued. + Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(2, + containerScheduler.getNumQueuedOpportunisticContainers()); } /** @@ -476,6 +462,91 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { } /** + * Start two OPPORTUNISTIC containers which together ask for all + * the allocations available on the node. When the node resource + * utilization goes over the preemption thresholds, neither of + * the containers should be preempted because there is no + * over-allocation at the moment and they can safely use up all + * the resources availabe on the node. + */ + @Test + public void testNoOpportunisticContainerPreemptionUponHighUtilization() + throws Exception { + containerManager.start(); + + // start two OPPORTUNISTIC containers that together takes up + // all allocations of the node. They two can be launched immediately + // because there is enough free allocation. When they uses up + // all their resource allocations, that is, the node is fully + // utilized, none of the OPPORTUNISTIC containers shall be killed + // because the node is not being over-allocated + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 2), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 2), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + // the two OPPORTUNISTIC containers shall be launched immediately + // because there is just enough allocation to launch them both. + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(0), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(1), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + + // Ensure all containers are running. + List<ContainerId> statList = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + + // we have two containers running, both of which are using all of + // their allocations. The node is being fully utilized in terms + // of both memory and CPU + ((ContainerMonitorForTest) containerManager.getContainersMonitor()) + .setContainerResourceUsage( + ResourceUtilization.newInstance(2048, 0, 1.0f)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // the two running OPPORTUNISTIC containers shall continue to run + // because when the node is not be over-allocated, it is safe to + // let the containers use up all the resources, no OPPORTUNISTIC + // containers shall be preempted + containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + } + + /** * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources * requests by each container as such that only one can run in parallel. * Thus, the OPPORTUNISTIC container that started running, will be --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org