YARN-6921. Allow resource request to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ceb4edbd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ceb4edbd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ceb4edbd Branch: refs/heads/YARN-1011 Commit: ceb4edbde76b74a782088e4e2066089a37b16ec4 Parents: 04c1424 Author: Miklos Szegedi <szege...@apache.org> Authored: Wed Nov 22 09:03:05 2017 -0800 Committer: Haibo Chen <haiboc...@apache.org> Committed: Mon Jun 4 21:01:33 2018 -0700 ---------------------------------------------------------------------- .../scheduler/common/PendingAsk.java | 15 +++- .../scheduler/fair/FSAppAttempt.java | 5 ++ .../LocalityAppPlacementAllocator.java | 27 ++++++- .../scheduler/fair/FairSchedulerTestBase.java | 40 ++++++++-- .../scheduler/fair/TestFairScheduler.java | 77 ++++++++++++++++++++ 5 files changed, 153 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ceb4edbd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java index 2ed3e83..470dbbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java @@ -30,16 +30,21 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class PendingAsk { private final Resource perAllocationResource; private final int count; - public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0); + public final static PendingAsk ZERO = + new PendingAsk(Resources.none(), 0, false); + + private final boolean isGuaranteedTypeEnforced; public PendingAsk(ResourceSizing sizing) { this.perAllocationResource = sizing.getResources(); this.count = sizing.getNumAllocations(); + this.isGuaranteedTypeEnforced = true; } - public PendingAsk(Resource res, int num) { + public PendingAsk(Resource res, int num, boolean guaranteedTypeEnforced) { this.perAllocationResource = res; this.count = num; + this.isGuaranteedTypeEnforced = guaranteedTypeEnforced; } public Resource getPerAllocationResource() { @@ -50,11 +55,17 @@ public class PendingAsk { return count; } + public boolean isGuaranteedTypeEnforced() { + return isGuaranteedTypeEnforced; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("<per-allocation-resource="); sb.append(getPerAllocationResource()); + sb.append(", isGuaranteedEnforced="); + sb.append(isGuaranteedTypeEnforced()); sb.append(",repeat="); sb.append(getCount()); sb.append(">"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ceb4edbd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 11bc773..1ac229b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -859,6 +859,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt FSSchedulerNode node, PendingAsk pendingAsk, NodeType type, boolean reserved, boolean opportunistic, SchedulerRequestKey schedulerKey) { + if (pendingAsk.isGuaranteedTypeEnforced() && opportunistic) { + // do not attempt to assign an OPPORTUNISTIC container to a resource + // request that has explicitly opted out of oversubscription + return Resources.none(); + } // How much does this request need? Resource capability = pendingAsk.getPerAllocationResource(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ceb4edbd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index a0358b4..38acda6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; @@ -158,13 +160,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode> PendingAsk lastPendingAsk = lastRequest == null ? null : new PendingAsk( - lastRequest.getCapability(), lastRequest.getNumContainers()); + lastRequest.getCapability(), lastRequest.getNumContainers(), + enforceGuaranteedExecutionType(lastRequest)); String lastRequestedNodePartition = lastRequest == null ? null : lastRequest.getNodeLabelExpression(); updateResult = new PendingAskUpdateResult(lastPendingAsk, new PendingAsk(request.getCapability(), - request.getNumContainers()), lastRequestedNodePartition, + request.getNumContainers(), + enforceGuaranteedExecutionType(request)), + lastRequestedNodePartition, request.getNodeLabelExpression()); } } @@ -204,8 +209,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode> if (null == request) { return PendingAsk.ZERO; } else{ + boolean guaranteedEnforced = enforceGuaranteedExecutionType(request); return new PendingAsk(request.getCapability(), - request.getNumContainers()); + request.getNumContainers(), guaranteedEnforced); } } finally { readLock.unlock(); @@ -213,6 +219,21 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode> } + /** + * Check for a given ResourceRequest, if its guaranteed execution type + * needs to be enforced. + * @param request resource request + * @return true if its guaranteed execution type is to be enforced. + * false otherwise + */ + private static boolean enforceGuaranteedExecutionType( + ResourceRequest request) { + ExecutionTypeRequest executionType = request.getExecutionTypeRequest(); + return executionType != null && + executionType.getExecutionType() == ExecutionType.GUARANTEED && + executionType.getEnforceExecutionType(); + } + @Override public int getOutstandingAsksCount(String resourceName) { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ceb4edbd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 3ac3849..43a3931 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -117,9 +119,15 @@ public class FairSchedulerTestBase { relaxLocality); } + protected ResourceRequest createResourceRequest(int memory, int vcores, + String host, int priority, int numContainers, boolean relaxLocality) { + return createResourceRequest(memory, vcores, host, priority, + numContainers, relaxLocality, false); + } + protected ResourceRequest createResourceRequest( int memory, int vcores, String host, int priority, int numContainers, - boolean relaxLocality) { + boolean relaxLocality, boolean guaranteedExecutionEnforced) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(BuilderUtils.newResource(memory, vcores)); request.setResourceName(host); @@ -129,6 +137,11 @@ public class FairSchedulerTestBase { request.setPriority(prio); request.setRelaxLocality(relaxLocality); request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + if (guaranteedExecutionEnforced) { + ExecutionTypeRequest executionType = ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true); + request.setExecutionTypeRequest(executionType); + } return request; } @@ -152,6 +165,13 @@ public class FairSchedulerTestBase { } protected ApplicationAttemptId createSchedulingRequest( + int memory, String queueId, String userId, + int numContainers, boolean guaranteedExecutionEnforced) { + return createSchedulingRequest(memory, 1, queueId, + userId, numContainers, 1, guaranteedExecutionEnforced); + } + + protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers) { return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); } @@ -165,16 +185,24 @@ public class FairSchedulerTestBase { protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { - ResourceRequest request = createResourceRequest(memory, vcores, - ResourceRequest.ANY, priority, numContainers, true); + return createSchedulingRequest(memory, vcores, queueId, + userId, numContainers, priority, false); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority, boolean guaranteedExecutionEnforced) { + ResourceRequest request = createResourceRequest( + memory, vcores, ResourceRequest.ANY, priority, + numContainers, true, guaranteedExecutionEnforced); return createSchedulingRequest(Lists.newArrayList(request), queueId, - userId); + userId); } protected ApplicationAttemptId createSchedulingRequest( Collection<ResourceRequest> requests, String queueId, String userId) { - ApplicationAttemptId id = - createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, + this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ceb4edbd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ff620b8..fbb7243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2712,6 +2712,83 @@ public class TestFairScheduler extends FairSchedulerTestBase { getPriority().getPriority()); } + @Test + public void testResourceRequestOptOutOfOversubscription() throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that leaves some unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3600, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3600, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container runs on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that opts out of oversubscription and + // asks for more than what's left unallocated on the node. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1536, "queue2", "user1", 1, true); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List<Container> allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 0); + + // verify that a reservation is made for the second request + assertTrue("A reservation should be made for the second request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(1536, 1))); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + /** * Test that NO OPPORTUNISTIC containers can be allocated on a node that * is fully allocated and with a very high utilization. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org