YARN-6921. Allow resource request to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05b729c2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05b729c2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05b729c2 Branch: refs/heads/YARN-1011 Commit: 05b729c2c56b291f3c8bdfc7d6c7d3b502f219f5 Parents: 0af7353 Author: Miklos Szegedi <miklos.szeg...@cloudera.com> Authored: Wed Nov 22 09:03:05 2017 -0800 Committer: Haibo Chen <haiboc...@apache.org> Committed: Wed Jan 3 12:30:03 2018 -0800 ---------------------------------------------------------------------- .../scheduler/common/PendingAsk.java | 14 +++- .../scheduler/fair/FSAppAttempt.java | 5 ++ .../LocalityAppPlacementAllocator.java | 27 ++++++- .../scheduler/fair/FairSchedulerTestBase.java | 34 ++++++++- .../scheduler/fair/TestFairScheduler.java | 77 ++++++++++++++++++++ 5 files changed, 149 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java index 85d8715..e9931a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java @@ -29,11 +29,15 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class PendingAsk { private final Resource perAllocationResource; private final int count; - public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0); + private final boolean isGuaranteedTypeEnforced; - public PendingAsk(Resource res, int num) { + public final static PendingAsk ZERO = + new PendingAsk(Resources.none(), 0, false); + + public PendingAsk(Resource res, int num, boolean guaranteedTypeEnforced) { this.perAllocationResource = res; this.count = num; + this.isGuaranteedTypeEnforced = guaranteedTypeEnforced; } public Resource getPerAllocationResource() { @@ -44,11 +48,17 @@ public class PendingAsk { return count; } + public boolean isGuaranteedTypeEnforced() { + return isGuaranteedTypeEnforced; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("<per-allocation-resource="); sb.append(getPerAllocationResource()); + sb.append(", isGuaranteedEnforced="); + sb.append(isGuaranteedTypeEnforced()); sb.append(",repeat="); sb.append(getCount()); sb.append(">"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 2bdac0a..8a89f78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -860,6 +860,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt FSSchedulerNode node, PendingAsk pendingAsk, NodeType type, boolean reserved, boolean opportunistic, SchedulerRequestKey schedulerKey) { + if (pendingAsk.isGuaranteedTypeEnforced() && opportunistic) { + // do not attempt to assign an OPPORTUNISTIC container to a resource + // request that has explicitly opted out of oversubscription + return Resources.none(); + } // How much does this request need? Resource capability = pendingAsk.getPerAllocationResource(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 766827c..88a6acb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; @@ -159,13 +161,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode> PendingAsk lastPendingAsk = lastRequest == null ? null : new PendingAsk( - lastRequest.getCapability(), lastRequest.getNumContainers()); + lastRequest.getCapability(), lastRequest.getNumContainers(), + enforceGuaranteedExecutionType(lastRequest)); String lastRequestedNodePartition = lastRequest == null ? null : lastRequest.getNodeLabelExpression(); updateResult = new PendingAskUpdateResult(lastPendingAsk, new PendingAsk(request.getCapability(), - request.getNumContainers()), lastRequestedNodePartition, + request.getNumContainers(), + enforceGuaranteedExecutionType(request)), + lastRequestedNodePartition, request.getNodeLabelExpression()); } } @@ -192,8 +197,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode> if (null == request) { return PendingAsk.ZERO; } else{ + boolean guaranteedEnforced = enforceGuaranteedExecutionType(request); return new PendingAsk(request.getCapability(), - request.getNumContainers()); + request.getNumContainers(), guaranteedEnforced); } } finally { readLock.unlock(); @@ -201,6 +207,21 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode> } + /** + * Check for a given ResourceRequest, if its guaranteed execution type + * needs to be enforced. + * @param request resource request + * @return true if its guaranteed execution type is to be enforced. + * false otherwise + */ + private static boolean enforceGuaranteedExecutionType( + ResourceRequest request) { + ExecutionTypeRequest executionType = request.getExecutionTypeRequest(); + return executionType != null && + executionType.getExecutionType() == ExecutionType.GUARANTEED && + executionType.getEnforceExecutionType(); + } + @Override public int getOutstandingAsksCount(String resourceName) { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd..6d9df4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -115,9 +117,15 @@ public class FairSchedulerTestBase { relaxLocality); } + protected ResourceRequest createResourceRequest(int memory, int vcores, + String host, int priority, int numContainers, boolean relaxLocality) { + return createResourceRequest(memory, vcores, host, priority, + numContainers, relaxLocality, false); + } + protected ResourceRequest createResourceRequest( int memory, int vcores, String host, int priority, int numContainers, - boolean relaxLocality) { + boolean relaxLocality, boolean guaranteedExecutionEnforced) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(BuilderUtils.newResource(memory, vcores)); request.setResourceName(host); @@ -127,6 +135,11 @@ public class FairSchedulerTestBase { request.setPriority(prio); request.setRelaxLocality(relaxLocality); request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + if (guaranteedExecutionEnforced) { + ExecutionTypeRequest executionType = ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true); + request.setExecutionTypeRequest(executionType); + } return request; } @@ -150,6 +163,13 @@ public class FairSchedulerTestBase { } protected ApplicationAttemptId createSchedulingRequest( + int memory, String queueId, String userId, + int numContainers, boolean guaranteedExecutionEnforced) { + return createSchedulingRequest(memory, 1, queueId, + userId, numContainers, 1, guaranteedExecutionEnforced); + } + + protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers) { return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); } @@ -163,6 +183,13 @@ public class FairSchedulerTestBase { protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { + return createSchedulingRequest(memory, vcores, queueId, userId, + numContainers, priority, false); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority, boolean guaranteedExecutionEnforced) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected @@ -171,8 +198,9 @@ public class FairSchedulerTestBase { scheduler.addApplicationAttempt(id, false, false); } List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + ResourceRequest request = createResourceRequest(memory, vcores, + ResourceRequest.ANY, priority, numContainers, true, + guaranteedExecutionEnforced); ask.add(request); RMApp rmApp = mock(RMApp.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index e70053c..d533617 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2704,6 +2704,83 @@ public class TestFairScheduler extends FairSchedulerTestBase { getPriority().getPriority()); } + @Test + public void testResourceRequestOptOutOfOversubscription() throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that leaves some unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3600, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3600, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container runs on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that opts out of oversubscription and + // asks for more than what's left unallocated on the node. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1536, "queue2", "user1", 1, true); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 0); + + // verify that a reservation is made for the second request + assertTrue("A reservation should be made for the second request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(1536, 1))); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + /** * Test that NO OPPORTUNISTIC containers can be allocated on a node that * is fully allocated and with a very high utilization. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org