Repository: hadoop Updated Branches: refs/heads/YARN-1011 c6acfc44d -> 1411874be
YARN-6794. Fair Scheduler to explicitly promote OPPORTUNISITIC containers locally at the node where they're running. Contributed by Haibo Chen. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1411874b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1411874b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1411874b Branch: refs/heads/YARN-1011 Commit: 1411874be8a5296e76d837d3d9c23113902db9ce Parents: c6acfc4 Author: Miklos Szegedi <szege...@apache.org> Authored: Wed Jun 20 10:53:20 2018 -0700 Committer: Miklos Szegedi <szege...@apache.org> Committed: Wed Jun 20 10:53:20 2018 -0700 ---------------------------------------------------------------------- .../scheduler/SchedulerApplicationAttempt.java | 3 +- .../scheduler/SchedulerNode.java | 27 + .../scheduler/fair/FSAppAttempt.java | 32 +- .../scheduler/fair/FSSchedulerNode.java | 57 +- .../scheduler/fair/FairScheduler.java | 47 +- .../TestResourceTrackerService.java | 3 +- .../scheduler/fair/TestFairScheduler.java | 569 +++++++++++++++++++ 7 files changed, 715 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1411874b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 60257b1..4b21526 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -671,7 +671,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } public Resource getCurrentConsumption() { - return attemptResourceUsage.getUsed(); + return Resources.add(attemptResourceUsage.getUsed(), + attemptOpportunisticResourceUsage.getUsed()); } private Container updateContainerAndNMToken(RMContainer rmContainer, http://git-wip-us.apache.org/repos/asf/hadoop/blob/1411874b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 7da01fd..d35ef96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -276,6 +276,33 @@ public abstract class SchedulerNode { return true; } + /** + * Attempt to promote an OPPORTUNISTIC container that has been allocated. + * @param rmContainer the OPPORTUNISTIC container to promote + * @return true if the given OPPORTUNISTIC container is promoted, + * false otherwise + */ + public synchronized boolean tryToPromoteOpportunisticContainer( + RMContainer rmContainer) { + assert (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC); + + boolean promoted = false; + Resource resource = rmContainer.getContainer().getResource(); + if (allocatedContainers.containsKey(rmContainer.getContainerId()) && + Resources.fitsIn(resource, getUnallocatedResource())) { + Resources.subtractFrom(allocatedResourceOpportunistic, resource); + numOpportunisticContainers--; + + Resources.addTo(allocatedResourceGuaranteed, resource); + numGuaranteedContainers++; + Resources.subtractFrom(unallocatedResource, resource); + + promoted = true; + } + + return promoted; + } + /** * Get resources that are not allocated to GUARANTEED containers on the node. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1411874b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 32998b6..a7cf030 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -505,6 +505,29 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } /** + * Update resource accounting upon promotion of an OPPORTUNISTIC container. + * @param rmContainer the OPPORTUNISTIC container that has been promoted + */ + public void opportunisticContainerPromoted(RMContainer rmContainer) { + // only an OPPORTUNISTIC container can be promoted + assert (ExecutionType.OPPORTUNISTIC == rmContainer.getExecutionType()); + + // the container to be promoted must belong to the current app attempt + if (rmContainer.getApplicationAttemptId().equals( + getApplicationAttemptId())) { + Resource resource = rmContainer.getContainer().getResource(); + try { + writeLock.lock(); + attemptOpportunisticResourceUsage.decUsed(resource); + attemptResourceUsage.incUsed(resource); + getQueue().incUsedGuaranteedResource(resource); + } finally { + writeLock.unlock(); + } + } + } + + /** * Should be called when the scheduler assigns a container at a higher * degree of locality than the current threshold. Reset the allowed locality * level to a higher degree of locality. @@ -1160,7 +1183,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * * @param node * Node that the application has an existing reservation on - * @return whether the reservation on the given node is valid. + * @return true if the reservation is turned into an allocation */ boolean assignReservedContainer(FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); @@ -1187,8 +1210,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (Resources.fitsIn(node.getReservedContainer().getReservedResource(), node.getUnallocatedResource())) { assignContainer(node, false, true); + return true; } - return true; + return false; } /** @@ -1356,12 +1380,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt @Override public Resource getGuaranteedResourceUsage() { - return getCurrentConsumption(); + return Resources.clone(attemptResourceUsage.getUsed()); } @Override public Resource getOpportunisticResourceUsage() { - return attemptOpportunisticResourceUsage.getUsed(); + return Resources.clone(attemptOpportunisticResourceUsage.getUsed()); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/1411874b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index a53dda4..efbe615 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -35,10 +36,13 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -66,6 +70,13 @@ public class FSSchedulerNode extends SchedulerNode { // slated for preemption private Resource totalResourcesPreempted = Resource.newInstance(0, 0); + // The set of containers that need to be handled before resource + // available on the node can be assigned to resource requests. + // This is a queue of reserved and opportunistic containers on + // the node. + private final LinkedHashSet<RMContainer> priorityContainers = + new LinkedHashSet(1); + @VisibleForTesting public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); @@ -124,6 +135,7 @@ public class FSSchedulerNode extends SchedulerNode { + application.getApplicationId()); } setReservedContainer(container); + priorityContainers.add(container); this.reservedAppSchedulable = (FSAppAttempt) application; } @@ -142,7 +154,7 @@ public class FSSchedulerNode extends SchedulerNode { " for application " + reservedApplication.getApplicationId() + " on node " + this); } - + priorityContainers.remove(getReservedContainer()); setReservedContainer(null); this.reservedAppSchedulable = null; } @@ -274,6 +286,13 @@ public class FSSchedulerNode extends SchedulerNode { } else { LOG.error("Allocated empty container" + rmContainer.getContainerId()); } + + // keep track of opportunistic containers allocated so that we can promote + // them before we assign resources available to resource requests. + if (ExecutionType.OPPORTUNISTIC.equals( + rmContainer.getContainer().getExecutionType())) { + priorityContainers.add(rmContainer); + } } /** @@ -292,4 +311,40 @@ public class FSSchedulerNode extends SchedulerNode { containersForPreemption.remove(container); } } + + /** + * Try to assign resources available to reserved container and opportunistic + * containers that have been allocated. + * @return the list of opportunistic containers that have been promoted + */ + public synchronized List<RMContainer> handlePriorityContainers() { + boolean assigned = true; + List<RMContainer> promotedContainers = new ArrayList<>(0); + + List<RMContainer> candidateContainers = new ArrayList<>(priorityContainers); + for (RMContainer rmContainer : candidateContainers) { + boolean isReservedContainer = + rmContainer.getReservedSchedulerKey() != null; + if (isReservedContainer) { + // attempt to assign resources that have been reserved + FSAppAttempt reservedApp = getReservedAppSchedulable(); + if (reservedApp != null) { + reservedApp.assignReservedContainer(this); + } + } else { + if (super.tryToPromoteOpportunisticContainer(rmContainer)) { + priorityContainers.remove(rmContainer); + assigned = true; + promotedContainers.add(rmContainer); + } + } + + if (!assigned) { + // break out of the loop because assigned being false indicates + // there is no more resources that are available for promotion. + break; + } + } + return promotedContainers; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1411874b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e6c0d9d..d200588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -102,6 +104,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -1108,8 +1111,8 @@ public class FairScheduler extends // Assign new containers... // 1. Ensure containers are assigned to the apps that preempted - // 2. Check for reserved applications - // 3. Schedule GUARANTEED containers if there are no reservations + // 2. Check for reserved applications or promote OPPORTUNISTIC containers + // 3. Schedule GUARANTEED containers // 4. Schedule OPPORTUNISTIC containers if possible // Apps may wait for preempted containers @@ -1118,12 +1121,14 @@ public class FairScheduler extends // when C does not qualify for preemption itself. attemptToAssignPreemptedResources(node); - boolean validReservation = attemptToAssignReservedResources(node); - if (!validReservation) { - // only attempt to assign GUARANTEED containers if there is no - // reservation on the node because - attemptToAssignResourcesAsGuaranteedContainers(node); - } + // before we assign resources to outstanding resource requests, we + // need to assign the resources to either the container that has + // made a reservation or allocated OPPORTUNISTIC containers so that + // they can be promoted. This ensures that request requests that + // are eligible for guaranteed resources are satisfied in FIFO order + attemptToAssignReservedResourcesOrPromoteOpportunisticContainers(node); + + attemptToAssignResourcesAsGuaranteedContainers(node); // attempt to assign OPPORTUNISTIC containers regardless of whether // we have made a reservation or assigned a GUARANTEED container @@ -1138,15 +1143,27 @@ public class FairScheduler extends } /** - * Assign the reserved resource to the application that have reserved it. + * Attempt to assign reserved resources and promote OPPORTUNISTIC containers + * thata have already been allocated. */ - private boolean attemptToAssignReservedResources(FSSchedulerNode node) { - boolean success = false; - FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); - if (reservedAppSchedulable != null) { - success = reservedAppSchedulable.assignReservedContainer(node); + private void attemptToAssignReservedResourcesOrPromoteOpportunisticContainers( + FSSchedulerNode node) { + Map<Container, ContainerUpdateType> promotion = new HashMap<>(0); + + List<RMContainer> promoted = node.handlePriorityContainers(); + for (RMContainer rmContainer : promoted) { + FSAppAttempt appAttempt = getSchedulerApp( + rmContainer.getApplicationAttemptId()); + appAttempt.opportunisticContainerPromoted(rmContainer); + + promotion.put(rmContainer.getContainer(), + ContainerUpdateType.PROMOTE_EXECUTION_TYPE); + } + + if (!promotion.isEmpty()) { + rmContext.getDispatcher().getEventHandler().handle( + new RMNodeUpdateContainerEvent(node.getNodeID(), promotion)); } - return success; } private void attemptToAssignResourcesAsGuaranteedContainers( http://git-wip-us.apache.org/repos/asf/hadoop/blob/1411874b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index fa0f5fd..cd6878e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -2221,9 +2221,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rc.getExecutionType()); } - // Should only include GUARANTEED resources currentConsumption = applicationAttempt.getCurrentConsumption(); - Assert.assertEquals(Resource.newInstance(2048, 1), currentConsumption); + Assert.assertEquals(Resource.newInstance(5120, 3), currentConsumption); allocResources = applicationAttempt.getQueue().getMetrics().getAllocatedResources(); Assert.assertEquals(Resource.newInstance(2048, 1), allocResources); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1411874b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5878ccd..7fbf84a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3353,6 +3353,575 @@ public class TestFairScheduler extends FairSchedulerTestBase { } } + /** + * Test promotion of a single OPPORTUNISTIC container when no resources are + * reserved on the node where the container is allocated. + */ + @Test + public void testSingleOpportunisticContainerPromotionWithoutReservation() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List<ContainerStatus> containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List<Container> allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("No reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + // now the first GUARANTEED container finishes + List<ContainerStatus> finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the OPPORTUNISTIC container should be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", + memoryAllocationIncrement); + } + } + + /** + * Test promotion of two OPPORTUNISTIC containers when no resources are + * reserved on the node where the container is allocated. + */ + @Test + public void testMultipleOpportunisticContainerPromotionWithoutReservation() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List<ContainerStatus> containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1536, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1536, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("No reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + // node utilization is low after the third container run on the node + ContainerStatus container3Status = ContainerStatus.newInstance( + allocatedContainers3.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(container3Status), + Collections.emptyList()), + ResourceUtilization.newInstance(2000, 0, 0.2f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt4 = + createSchedulingRequest(1024, "queue3", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers4 = + scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers4.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers4.get(0).getExecutionType()); + + // now the first GUARANTEED container finishes + List<ContainerStatus> finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + // the first OPPORTUNISTIC container should be promoted + assertEquals(1536, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + // the second OPPORLTUNISTIC container should not be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + + // now the second GUARANTEED container finishes + finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers2.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(3000, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + // the second OPPORTUNISTIC container should be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", + memoryAllocationIncrement); + } + } + + /** + * Test promotion of OPPORTUNISTIC container when there is resources + * reserved before the container is allocated. The scheduler should + * satisfy the reservation first before it promotes the OPPORTUNISTIC + * container when resources are released. + */ + @Test + public void testOpportunisticContainerPromotionWithPriorReservation() + throws Exception { + + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List<ContainerStatus> containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that opts out of oversubscription + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(2000, "queue2", "user1", 1, true); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 0); + // verify that a reservation is made for the second request + assertTrue("A reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(2000, 1))); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt4 = + createSchedulingRequest(1024, "queue3", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + List<Container> allocatedContainers4 = + scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers4.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers4.get(0).getExecutionType()); + assertTrue("A reservation should still be made for the second request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(2000, 1))); + + // now the first GUARANTEED container finishes + List<ContainerStatus> finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the reserved container of the third request that opted out of + // oversubscription should now be satisfied with a GUARANTEED container + assertEquals(2000, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("The reservation for the third request should be canceled", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + // the OPPORTUNISTIC container should not be promoted given the released + // resources are taken by handling the reservation + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + + // now the second GUARANTEED container finishes + finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers2.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(3000, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the OPPORTUNISTIC container should be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", + memoryAllocationIncrement); + } + } + + /** + * Test promotion of OPPORTUNISTIC container when there is resources + * reserved after the container is allocated. The scheduler should + * promotes the OPPORTUNISTIC container before it satisfy the reservation + * when resources are released. + */ + @Test + public void testOpportunisticContainerPromotionWithPostReservation() + throws Exception { + + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List<ContainerStatus> containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List<Container> allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("No reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + // create another scheduling request that opts out of oversubscription + ApplicationAttemptId appAttempt4 = + createSchedulingRequest(2000, "queue3", "user1", 1, true); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers4 = + scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers4.size() == 0); + // verify that a reservation is made for the second request + assertTrue("A reservation should be made for the fourth request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(2000, 1))); + + // now the first GUARANTEED container finishes + List<ContainerStatus> finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the OPPORTUNISTIC container should be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + assertTrue("A reservation should still be made for the fourth request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(2000, 1))); + + // now the second GUARANTEED container finishes + finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers2.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(3000, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the reserved container of the fourth request that opted out of + // oversubscription should now be satisfied with a GUARANTEED container + assertEquals(2000, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + allocatedContainers4 = + scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers4.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers4.get(0).getExecutionType()); + assertTrue("The reservation for the fourth request should be canceled", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", + memoryAllocationIncrement); + } + } + @Test public void testAclSubmitApplication() throws Exception { // Set acl's --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org