http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ef338d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 1227de2..ac925c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -55,14 +55,20 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -72,6 +78,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -93,6 +101,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -1056,15 +1065,15 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(2, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getVirtualCores()); + getGuaranteedResourceUsage().getVirtualCores()); // verify metrics QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1") @@ -1099,7 +1108,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 1 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); @@ -1109,7 +1118,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize()); // Now another node checks in with capacity @@ -1123,7 +1132,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure this goes to queue 2 assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // The old reservation should still be there... assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize()); @@ -1133,7 +1142,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { } - @Test (timeout = 5000) + @Test public void testOffSwitchAppReservationThreshold() throws Exception { conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f); scheduler.init(conf); @@ -1173,7 +1182,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify capacity allocation assertEquals(6144, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Create new app with a resource request that can be satisfied by any // node but would be @@ -1205,7 +1214,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.update(); scheduler.handle(new NodeUpdateSchedulerEvent(node4)); assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); scheduler.handle(new NodeUpdateSchedulerEvent(node1)); scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -1266,7 +1275,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify capacity allocation assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Create new app with a resource request that can be satisfied by any // node but would be @@ -1311,7 +1320,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.update(); scheduler.handle(new NodeUpdateSchedulerEvent(node4)); assertEquals(10240, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); scheduler.handle(new NodeUpdateSchedulerEvent(node1)); scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -1355,7 +1364,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify capacity allocation assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Verify number of reservations have decremented assertEquals(0, @@ -1399,7 +1408,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 1 is allocated app capacity assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise createSchedulingRequest(1024, "queue2", "user2", 1); @@ -1408,7 +1417,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); @@ -1534,7 +1543,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 1 is allocated app capacity assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise createSchedulingRequest(1024, "queue2", "user2", 1); @@ -1543,7 +1552,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); @@ -1583,12 +1592,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure allocated memory of queue1 doesn't exceed its maximum assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); //the reservation of queue1 should be reclaim assertEquals(0, scheduler.getSchedulerApp(attId1). getCurrentReservation().getMemorySize()); assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); } @Test @@ -1628,7 +1637,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 1 is allocated app capacity assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests below threshold ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); @@ -1637,7 +1646,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 has no reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(0, scheduler.getSchedulerApp(attId).getReservedContainers().size()); @@ -1648,7 +1657,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() .getVirtualCores()); @@ -1663,7 +1672,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure this goes to queue 2 assertEquals(3, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getVirtualCores()); + getGuaranteedResourceUsage().getVirtualCores()); // The old reservation should still be there... assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() @@ -2696,7 +2705,361 @@ public class TestFairScheduler extends FairSchedulerTestBase { 2, liveContainers.iterator().next().getContainer(). getPriority().getPriority()); } - + + /** + * Test that NO OPPORTUNISTIC containers can be allocated on a node that + * is fully allocated and with a very high utilization. + */ + @Test + public void testAllocateNoOpportunisticContainersOnBusyNode() + throws IOException { + conf.setBoolean( + YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(2048, 2), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up the node's full memory + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization shoots up after the container runs on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(2000, 0, 0.8f)); + + // create another scheduling request + ApplicationAttemptId appAttempt2 + = createSchedulingRequest(100, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue("Expecting no containers allocated", + allocatedContainers2.size() == 0); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + + // verify that a reservation is made for the second resource request + Resource reserved = scheduler.getNode(node.getNodeID()). + getReservedContainer().getReservedResource(); + assertTrue("Expect a reservation made for the second resource request", + reserved.equals(Resource.newInstance(100, 1))); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test that OPPORTUNISTIC containers can be allocated on a node with low + * utilization even though there is not enough unallocated resource on the + * node to accommodate the request. + */ + @Test + public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that leaves some unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3600, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3600, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1800, 0, 0.5f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + + // verify that no reservation is made for the second request given + // that it's satisfied by an OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made because we have satisfied" + + " the second request with an OPPORTUNISTIC container allocation", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test opportunistic containers can be allocated on a node that is fully + * allocated but whose utilization is very low. + */ + @Test + public void testAllocateOpportunisticContainersOnFullyAllocatedNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up the whole node + ApplicationAttemptId appAttempt1 = createSchedulingRequest( + 4096, "queue1", "user1", 4); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1800, 0, 0.5f)); + + // create another scheduling request now that there is no unallocated + // resources left on the node, the request should be served with an + // allocation of an opportunistic container + ApplicationAttemptId appAttempt2 = createSchedulingRequest( + 1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + + // verify that no reservation is made for the second request given + // that it's satisfied by an OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made because we have satisfied" + + " the second request with an OPPORTUNISTIC container allocation", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test opportunistic containers can be allocated on a node with a low + * utilization even though there are GUARANTEED containers allocated. + */ + @Test + public void testAllocateOpportunisticContainersWithGuaranteedOnes() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3200, "queue1", "user1", 3); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3200, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(512, 0, 0.1f)); + + // create two other scheduling requests which in aggregate ask for more + // that what's left unallocated on the node. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(512, "queue2", "user1", 1); + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue3", "user1", 1); + + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(512, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + List<Container> allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + + // verify that no reservation is made given that the second request should + // be satisfied by a GUARANTEED container allocation, the third by an + // OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made.", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + @Test public void testAclSubmitApplication() throws Exception { // Set acl's @@ -3686,7 +4049,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { .createAbnormalContainerStatus(container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.FINISHED); - assertEquals(Resources.none(), app1.getResourceUsage()); + assertEquals(Resources.none(), app1.getGuaranteedResourceUsage()); } @Test @@ -3786,7 +4149,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application1's AM should be finished", 0, app1.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app1.getResourceUsage()); + Resources.none(), app1.getGuaranteedResourceUsage()); assertEquals("Application3's AM should be running", 1, app3.getLiveContainers().size()); assertEquals("Application3's AM requests 1024 MB memory", @@ -3806,7 +4169,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application4's AM should not be running", 0, app4.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app4.getResourceUsage()); + Resources.none(), app4.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3822,7 +4185,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3835,7 +4198,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3851,11 +4214,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application2's AM should be finished", 0, app2.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app2.getResourceUsage()); + Resources.none(), app2.getGuaranteedResourceUsage()); assertEquals("Application3's AM should be finished", 0, app3.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app3.getResourceUsage()); + Resources.none(), app3.getGuaranteedResourceUsage()); assertEquals("Application5's AM should be running", 1, app5.getLiveContainers().size()); assertEquals("Application5's AM requests 2048 MB memory", @@ -3876,7 +4239,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application5's AM should have 0 container", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); scheduler.update(); @@ -3900,7 +4263,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application6's AM should not be running", 0, app6.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app6.getResourceUsage()); + Resources.none(), app6.getGuaranteedResourceUsage()); assertEquals("Application6's AM resource shouldn't be updated", 0, app6.getAMResource().getMemorySize()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -4615,17 +4978,25 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true); FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true); - Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB); + Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2"); - Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0); + Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(), + 0); } @Test (expected = YarnException.class) @@ -4665,7 +5036,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(updateEvent); scheduler.handle(updateEvent); - assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage()); + assertEquals(Resource.newInstance(2048, 2), + oldQueue.getGuaranteedResourceUsage()); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); } @@ -5089,7 +5461,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(new NodeUpdateSchedulerEvent(node2)); assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); //container will be reserved at node1 RMContainer reservedContainer1 = @@ -5109,7 +5481,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { app1, RMAppAttemptState.KILLED, false)); assertEquals(0, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // container will be allocated at node2 scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -5257,10 +5629,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app1 = mock(FSAppAttempt.class); Mockito.when(app1.getDemand()).thenReturn(maxResource); - Mockito.when(app1.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app1.getGuaranteedResourceUsage()). + thenReturn(Resources.none()); FSAppAttempt app2 = mock(FSAppAttempt.class); Mockito.when(app2.getDemand()).thenReturn(maxResource); - Mockito.when(app2.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app2.getGuaranteedResourceUsage()). + thenReturn(Resources.none()); QueueManager queueManager = scheduler.getQueueManager(); FSParentQueue queue1 = queueManager.getParentQueue("queue1", true); @@ -5316,7 +5690,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { child1.setMaxShare(new ConfigurableResource(resource)); FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(resource); - Mockito.when(app.getResourceUsage()).thenReturn(resource); + Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(resource); child1.addApp(app, true); child1.updateDemand(); @@ -5352,7 +5726,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { + " SteadyFairShare: <memory:0, vCores:0>," + " MaxShare: <memory:4096, vCores:4>," + " MinShare: <memory:0, vCores:0>," - + " ResourceUsage: <memory:4096, vCores:4>," + + " Guaranteed ResourceUsage: <memory:4096, vCores:4>," + " Demand: <memory:4096, vCores:4>," + " MaxAMShare: 0.5," + " Runnable: 0}";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ef338d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index b016c1b..6777b5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -243,11 +243,16 @@ public class TestSchedulingPolicy { } @Override - public Resource getResourceUsage() { + public Resource getGuaranteedResourceUsage() { return usage; } @Override + public Resource getOpportunisticResourceUsage() { + return Resource.newInstance(0, 0); + } + + @Override public Resource getMinShare() { return minShare; } @@ -278,7 +283,8 @@ public class TestSchedulingPolicy { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, + boolean opportunistic) { throw new UnsupportedOperationException(); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org