http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index b65f16a..1b20556 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1134,12 +1134,7 @@ public class LeafQueue extends AbstractCSQueue { if (targetLeafQueue == this) { // When trying to preempt containers from the same queue - if (rmContainer.hasIncreaseReservation()) { - // Increased container reservation - unreserveIncreasedContainer(clusterResource, - schedulerContainer.getSchedulerApplicationAttempt(), - schedulerContainer.getSchedulerNode(), rmContainer); - } else if (rmContainer.getState() == RMContainerState.RESERVED) { + if (rmContainer.getState() == RMContainerState.RESERVED) { // For other reserved containers // This is a reservation exchange, complete previous reserved container completedContainer(clusterResource, @@ -1212,8 +1207,7 @@ public class LeafQueue extends AbstractCSQueue { schedulerContainer.getSchedulerApplicationAttempt(), allocation.getAllocatedOrReservedResource(), schedulerContainer.getNodePartition(), - schedulerContainer.getRmContainer(), - allocation.isIncreasedAllocation()); + schedulerContainer.getRmContainer()); orderingPolicy.containerAllocated( schedulerContainer.getSchedulerApplicationAttempt(), schedulerContainer.getRmContainer()); @@ -1446,40 +1440,6 @@ public class LeafQueue extends AbstractCSQueue { readLock.unlock(); } } - - @Override - public void unreserveIncreasedContainer(Resource clusterResource, - FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) { - boolean removed = false; - Priority priority = null; - - try { - writeLock.lock(); - if (rmContainer.getContainer() != null) { - priority = rmContainer.getContainer().getPriority(); - } - - if (null != priority) { - removed = app.unreserve(rmContainer.getAllocatedSchedulerKey(), node, - rmContainer); - } - - if (removed) { - // Inform the ordering policy - orderingPolicy.containerReleased(app, rmContainer); - - releaseResource(clusterResource, app, rmContainer.getReservedResource(), - node.getPartition(), rmContainer, true); - } - } finally { - writeLock.unlock(); - } - - if (removed) { - getParent().unreserveIncreasedContainer(clusterResource, app, node, - rmContainer); - } - } private void updateSchedulerHealthForCompletedContainer( RMContainer rmContainer, ContainerStatus containerStatus) { @@ -1538,16 +1498,6 @@ public class LeafQueue extends AbstractCSQueue { updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus); if (application != null) { - // unreserve container increase request if it previously reserved. - if (rmContainer.hasIncreaseReservation()) { - unreserveIncreasedContainer(clusterResource, application, node, - rmContainer); - } - - // Remove container increase request if it exists - application.removeIncreaseRequest(node.getNodeID(), - rmContainer.getAllocatedSchedulerKey(), rmContainer.getContainerId()); - boolean removed = false; // Careful! Locking order is important! @@ -1576,7 +1526,7 @@ public class LeafQueue extends AbstractCSQueue { orderingPolicy.containerReleased(application, rmContainer); releaseResource(clusterResource, application, container.getResource(), - node.getPartition(), rmContainer, false); + node.getPartition(), rmContainer); } } finally { writeLock.unlock(); @@ -1597,12 +1547,10 @@ public class LeafQueue extends AbstractCSQueue { void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, - String nodePartition, RMContainer rmContainer, - boolean isIncreasedAllocation) { + String nodePartition, RMContainer rmContainer) { try { writeLock.lock(); - super.allocateResource(clusterResource, resource, nodePartition, - isIncreasedAllocation); + super.allocateResource(clusterResource, resource, nodePartition); // handle ignore exclusivity container if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( @@ -1643,11 +1591,10 @@ public class LeafQueue extends AbstractCSQueue { void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource, String nodePartition, - RMContainer rmContainer, boolean isChangeResource) { + RMContainer rmContainer) { try { writeLock.lock(); - super.releaseResource(clusterResource, resource, nodePartition, - isChangeResource); + super.releaseResource(clusterResource, resource, nodePartition); // handle ignore exclusivity container if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( @@ -1787,7 +1734,7 @@ public class LeafQueue extends AbstractCSQueue { rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, rmContainer.getContainer().getResource(), node.getPartition(), - rmContainer, false); + rmContainer); } finally { writeLock.unlock(); } @@ -1912,7 +1859,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getPartition(), rmContainer, false); + .getResource(), node.getPartition(), rmContainer); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() @@ -1931,7 +1878,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getPartition(), rmContainer, false); + .getResource(), node.getPartition(), rmContainer); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() @@ -2000,85 +1947,6 @@ public class LeafQueue extends AbstractCSQueue { return defaultAppPriorityPerQueue; } - /** - * - * @param clusterResource Total cluster resource - * @param decreaseRequest The decrease request - * @param app The application of interest - */ - @Override - public void decreaseContainer(Resource clusterResource, - SchedContainerChangeRequest decreaseRequest, - FiCaSchedulerApp app) throws InvalidResourceRequestException { - // If the container being decreased is reserved, we need to unreserve it - // first. - RMContainer rmContainer = decreaseRequest.getRMContainer(); - if (rmContainer.hasIncreaseReservation()) { - unreserveIncreasedContainer(clusterResource, app, - (FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer); - } - boolean resourceDecreased = false; - Resource resourceBeforeDecrease; - // Grab queue lock to avoid race condition when getting container resource - - try { - writeLock.lock(); - // Make sure the decrease request is valid in terms of current resource - // and target resource. This must be done under the leaf queue lock. - // Throws exception if the check fails. - RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false); - // Save resource before decrease for debug log - resourceBeforeDecrease = Resources.clone( - rmContainer.getAllocatedResource()); - // Do we have increase request for the same container? If so, remove it - boolean hasIncreaseRequest = app.removeIncreaseRequest( - decreaseRequest.getNodeId(), - decreaseRequest.getRMContainer().getAllocatedSchedulerKey(), - decreaseRequest.getContainerId()); - if (hasIncreaseRequest) { - if (LOG.isDebugEnabled()) { - LOG.debug("While processing decrease requests, found an increase" - + " request for the same container " + decreaseRequest - .getContainerId() + ", removed the increase request"); - } - } - // Delta capacity is negative when it's a decrease request - Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); - if (Resources.equals(absDelta, Resources.none())) { - // If delta capacity of this decrease request is 0, this decrease - // request serves the purpose of cancelling an existing increase request - // if any - if (LOG.isDebugEnabled()) { - LOG.debug("Decrease target resource equals to existing resource for" - + " container:" + decreaseRequest.getContainerId() - + " ignore this decrease request."); - } - } else{ - // Release the delta resource - releaseResource(clusterResource, app, absDelta, - decreaseRequest.getNodePartition(), - decreaseRequest.getRMContainer(), true); - // Notify application - app.decreaseContainer(decreaseRequest); - // Notify node - decreaseRequest.getSchedulerNode().decreaseContainer( - decreaseRequest.getContainerId(), absDelta); - resourceDecreased = true; - } - } finally { - writeLock.unlock(); - } - - if (resourceDecreased) { - // Notify parent queue outside of leaf queue lock - getParent().decreaseContainer(clusterResource, decreaseRequest, app); - LOG.info("Application attempt " + app.getApplicationAttemptId() - + " decreased container:" + decreaseRequest.getContainerId() - + " from " + resourceBeforeDecrease + " to " - + decreaseRequest.getTargetCapacity()); - } - } - public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app, Priority newAppPriority) { try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 9c42c61..6f82fcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -773,12 +773,11 @@ public class ParentQueue extends AbstractCSQueue { } private void internalReleaseResource(Resource clusterResource, - FiCaSchedulerNode node, Resource releasedResource, - boolean changeResource) { + FiCaSchedulerNode node, Resource releasedResource) { try { writeLock.lock(); super.releaseResource(clusterResource, releasedResource, - node.getPartition(), changeResource); + node.getPartition()); if (LOG.isDebugEnabled()) { LOG.debug( @@ -789,38 +788,6 @@ public class ParentQueue extends AbstractCSQueue { writeLock.unlock(); } } - - @Override - public void decreaseContainer(Resource clusterResource, - SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) - throws InvalidResourceRequestException { - // delta capacity is negative when it's a decrease request - Resource absDeltaCapacity = - Resources.negate(decreaseRequest.getDeltaCapacity()); - - internalReleaseResource(clusterResource, - csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false); - - // Inform the parent - if (parent != null) { - parent.decreaseContainer(clusterResource, decreaseRequest, app); - } - } - - @Override - public void unreserveIncreasedContainer(Resource clusterResource, - FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) { - if (app != null) { - internalReleaseResource(clusterResource, node, - rmContainer.getReservedResource(), false); - - // Inform the parent - if (parent != null) { - parent.unreserveIncreasedContainer(clusterResource, app, node, - rmContainer); - } - } - } @Override public void completedContainer(Resource clusterResource, @@ -830,7 +797,7 @@ public class ParentQueue extends AbstractCSQueue { boolean sortQueues) { if (application != null) { internalReleaseResource(clusterResource, node, - rmContainer.getContainer().getResource(), false); + rmContainer.getContainer().getResource()); // Inform the parent if (parent != null) { @@ -886,7 +853,7 @@ public class ParentQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode( rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, - rmContainer.getContainer().getResource(), node.getPartition(), false); + rmContainer.getContainer().getResource(), node.getPartition()); } finally { writeLock.unlock(); } @@ -923,7 +890,7 @@ public class ParentQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getPartition(), false); + .getResource(), node.getPartition()); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" @@ -943,7 +910,7 @@ public class ParentQueue extends AbstractCSQueue { scheduler.getNode(rmContainer.getContainer().getNodeId()); super.releaseResource(clusterResource, rmContainer.getContainer().getResource(), - node.getPartition(), false); + node.getPartition()); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" @@ -960,11 +927,10 @@ public class ParentQueue extends AbstractCSQueue { } void allocateResource(Resource clusterResource, - Resource resource, String nodePartition, boolean changeContainerResource) { + Resource resource, String nodePartition) { try { writeLock.lock(); - super.allocateResource(clusterResource, resource, nodePartition, - changeContainerResource); + super.allocateResource(clusterResource, resource, nodePartition); /** * check if we need to kill (killable) containers if maximum resource violated. @@ -1054,8 +1020,7 @@ public class ParentQueue extends AbstractCSQueue { // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(cluster, allocation.getAllocatedOrReservedResource(), - schedulerContainer.getNodePartition(), - allocation.isIncreasedAllocation()); + schedulerContainer.getNodePartition()); LOG.info("assignedContainer" + " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 57188d8..4879fae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerAllocator extends AbstractContainerAllocator { - private AbstractContainerAllocator increaseContainerAllocator; private AbstractContainerAllocator regularContainerAllocator; public ContainerAllocator(FiCaSchedulerApp application, @@ -45,8 +44,6 @@ public class ContainerAllocator extends AbstractContainerAllocator { RMContext rmContext, ActivitiesManager activitiesManager) { super(application, rc, rmContext); - increaseContainerAllocator = - new IncreaseContainerAllocator(application, rc, rmContext); regularContainerAllocator = new RegularContainerAllocator(application, rc, rmContext, activitiesManager); } @@ -55,32 +52,8 @@ public class ContainerAllocator extends AbstractContainerAllocator { public CSAssignment assignContainers(Resource clusterResource, PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { - if (reservedContainer != null) { - if (reservedContainer.getState() == RMContainerState.RESERVED) { - // It's a regular container - return regularContainerAllocator.assignContainers(clusterResource, - ps, schedulingMode, resourceLimits, reservedContainer); - } else { - // It's a increase container - return increaseContainerAllocator.assignContainers(clusterResource, - ps, schedulingMode, resourceLimits, reservedContainer); - } - } else { - /* - * Try to allocate increase container first, and if we failed to allocate - * anything, we will try to allocate regular container - */ - CSAssignment assign = - increaseContainerAllocator.assignContainers(clusterResource, ps, - schedulingMode, resourceLimits, null); - if (Resources.greaterThan(rc, clusterResource, assign.getResource(), - Resources.none())) { - return assign; - } - - return regularContainerAllocator.assignContainers(clusterResource, ps, - schedulingMode, resourceLimits, null); - } + return regularContainerAllocator.assignContainers(clusterResource, + ps, schedulingMode, resourceLimits, reservedContainer); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java deleted file mode 100644 index 0dc527f..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ /dev/null @@ -1,337 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - -public class IncreaseContainerAllocator extends AbstractContainerAllocator { - private static final Log LOG = - LogFactory.getLog(IncreaseContainerAllocator.class); - - public IncreaseContainerAllocator(FiCaSchedulerApp application, - ResourceCalculator rc, RMContext rmContext) { - super(application, rc, rmContext); - } - - /** - * Quick check if we can allocate anything here: - * We will not continue if: - * - Headroom doesn't support allocate minimumAllocation - * - - */ - private boolean checkHeadroom(Resource clusterResource, - ResourceLimits currentResourceLimits, Resource required) { - return Resources.greaterThanOrEqual(rc, clusterResource, - currentResourceLimits.getHeadroom(), required); - } - - private CSAssignment createReservedIncreasedCSAssignment( - SchedContainerChangeRequest request) { - CSAssignment assignment = - new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, - application, CSAssignment.SkippedType.NONE, false); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - request.getDeltaCapacity()); - assignment.getAssignmentInformation().incrReservations(); - assignment.getAssignmentInformation().addReservationDetails( - request.getRMContainer(), application.getCSLeafQueue().getQueuePath()); - assignment.setIncreasedAllocation(true); - - LOG.info("Reserved increase container request:" + request.toString()); - - return assignment; - } - - private CSAssignment createSuccessfullyIncreasedCSAssignment( - SchedContainerChangeRequest request, boolean fromReservation) { - CSAssignment assignment = - new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, - application, CSAssignment.SkippedType.NONE, fromReservation); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - request.getDeltaCapacity()); - assignment.getAssignmentInformation().incrAllocations(); - assignment.getAssignmentInformation().addAllocationDetails( - request.getRMContainer(), application.getCSLeafQueue().getQueuePath()); - assignment.setIncreasedAllocation(true); - - if (fromReservation) { - assignment.setFulfilledReservedContainer(request.getRMContainer()); - } - - // notify application - application - .getCSLeafQueue() - .getOrderingPolicy() - .containerAllocated(application, - application.getRMContainer(request.getContainerId())); - - LOG.info("Approved increase container request:" + request.toString() - + " fromReservation=" + fromReservation); - - return assignment; - } - - private CSAssignment allocateIncreaseRequestFromReservedContainer( - SchedulerNode node, Resource cluster, - SchedContainerChangeRequest increaseRequest) { - if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(), - node.getUnallocatedResource())) { - return createSuccessfullyIncreasedCSAssignment(increaseRequest, true); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to allocate reserved increase request:" - + increaseRequest.toString() - + ". There's no enough available resource"); - } - - // We still cannot allocate this container, will wait for next turn - return CSAssignment.SKIP_ASSIGNMENT; - } - } - - private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node, - Resource cluster, SchedContainerChangeRequest increaseRequest) { - if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(), - node.getUnallocatedResource())) { - return createSuccessfullyIncreasedCSAssignment(increaseRequest, false); - } else{ - // We cannot allocate this container, but since queue capacity / - // user-limit matches, we can reserve this container on this node. - return createReservedIncreasedCSAssignment(increaseRequest); - } - } - - @Override - public CSAssignment assignContainers(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer) { - AppSchedulingInfo sinfo = application.getAppSchedulingInfo(); - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); - - if (null == node) { - // This is global scheduling enabled - // FIXME, support container increase when global scheduling enabled - return CSAssignment.SKIP_ASSIGNMENT; - } - NodeId nodeId = node.getNodeID(); - - if (reservedContainer == null) { - // Do we have increase request on this node? - if (!sinfo.hasIncreaseRequest(nodeId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip allocating increase request since we don't have any" - + " increase request on this node=" + node.getNodeID()); - } - - return CSAssignment.SKIP_ASSIGNMENT; - } - - // Check if we need to unreserve something, note that we don't support - // continuousReservationLooking now. TODO, need think more about how to - // support it. - boolean shouldUnreserve = - Resources.greaterThan(rc, clusterResource, - resourceLimits.getAmountNeededUnreserve(), Resources.none()); - - // Check if we can allocate minimum resource according to headroom - boolean cannotAllocateAnything = - !checkHeadroom(clusterResource, resourceLimits, rmContext - .getScheduler().getMinimumResourceCapability()); - - // Skip the app if we failed either of above check - if (cannotAllocateAnything || shouldUnreserve) { - if (LOG.isDebugEnabled()) { - if (shouldUnreserve) { - LOG.debug("Cannot continue since we have to unreserve some resource" - + ", now increase container allocation doesn't " - + "support continuous reservation looking.."); - } - if (cannotAllocateAnything) { - LOG.debug("We cannot allocate anything because of low headroom, " - + "headroom=" + resourceLimits.getHeadroom()); - } - } - - return CSAssignment.SKIP_ASSIGNMENT; - } - - CSAssignment assigned = null; - - /* - * Loop each priority, and containerId. Container priority is not - * equivalent to request priority, application master can run an important - * task on a less prioritized container. - * - * So behavior here is, we still try to increase container with higher - * priority, but will skip increase request and move to next increase - * request if queue-limit or user-limit aren't satisfied - */ - for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Looking at increase request for application=" - + application.getApplicationAttemptId() + " priority=" - + schedulerKey.getPriority()); - } - - /* - * If we have multiple to-be-increased containers under same priority on - * a same host, we will try to increase earlier launched container - * first. And again - we will skip a request and move to next if it - * cannot be allocated. - */ - Map<ContainerId, SchedContainerChangeRequest> increaseRequestMap = - sinfo.getIncreaseRequests(nodeId, schedulerKey); - - // We don't have more increase request on this priority, skip.. - if (null == increaseRequestMap) { - if (LOG.isDebugEnabled()) { - LOG.debug("There's no increase request for " - + application.getApplicationAttemptId() + " priority=" - + schedulerKey.getPriority()); - } - continue; - } - Iterator<Entry<ContainerId, SchedContainerChangeRequest>> iter = - increaseRequestMap.entrySet().iterator(); - - while (iter.hasNext()) { - Entry<ContainerId, SchedContainerChangeRequest> entry = - iter.next(); - SchedContainerChangeRequest increaseRequest = - entry.getValue(); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Looking at increase request=" + increaseRequest.toString()); - } - - boolean headroomSatisifed = checkHeadroom(clusterResource, - resourceLimits, increaseRequest.getDeltaCapacity()); - if (!headroomSatisifed) { - // skip if doesn't satisfy headroom limit - if (LOG.isDebugEnabled()) { - LOG.debug(" Headroom is not satisfied, skip.."); - } - continue; - } - - RMContainer rmContainer = increaseRequest.getRMContainer(); - if (rmContainer.getContainerState() != ContainerState.RUNNING) { - // if the container is not running, we should remove the - // increaseRequest and continue; - if (LOG.isDebugEnabled()) { - LOG.debug(" Container is not running any more, skip..."); - } - application.addToBeRemovedIncreaseRequest(increaseRequest); - continue; - } - - if (!Resources.fitsIn(rc, clusterResource, - increaseRequest.getTargetCapacity(), node.getTotalResource())) { - // if the target capacity is more than what the node can offer, we - // will simply remove and skip it. - // The reason of doing check here instead of adding increase request - // to scheduler because node's resource could be updated after - // request added. - if (LOG.isDebugEnabled()) { - LOG.debug(" Target capacity is more than what node can offer," - + " node.resource=" + node.getTotalResource()); - } - application.addToBeRemovedIncreaseRequest(increaseRequest); - continue; - } - - // Try to allocate the increase request - assigned = - allocateIncreaseRequest(node, clusterResource, increaseRequest); - if (assigned.getSkippedType() - == CSAssignment.SkippedType.NONE) { - // When we don't skip this request, which means we either allocated - // OR reserved this request. We will break - break; - } - } - - // We may have allocated something - if (assigned != null && assigned.getSkippedType() - == CSAssignment.SkippedType.NONE) { - break; - } - } - - return assigned == null ? CSAssignment.SKIP_ASSIGNMENT : assigned; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to allocate reserved increase container request.."); - } - - // We already reserved this increase container - SchedContainerChangeRequest request = - sinfo.getIncreaseRequest(nodeId, - reservedContainer.getAllocatedSchedulerKey(), - reservedContainer.getContainerId()); - - // We will cancel the reservation any of following happens - // - Container finished - // - No increase request needed - // - Target resource updated - if (null == request - || reservedContainer.getContainerState() != ContainerState.RUNNING - || (!Resources.equals(reservedContainer.getReservedResource(), - request.getDeltaCapacity()))) { - if (LOG.isDebugEnabled()) { - LOG.debug("We don't need reserved increase container request " - + "for container=" + reservedContainer.getContainerId() - + ". Unreserving and return..."); - } - - // We don't need this container now, just return excessive reservation - return new CSAssignment(application, reservedContainer); - } - - return allocateIncreaseRequestFromReservedContainer(node, clusterResource, - request); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java index ac83d6f..2921e7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java @@ -43,8 +43,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt, // not be included by toRelease list private SchedulerContainer<A, N> allocateFromReservedContainer; - private boolean isIncreasedAllocation; - private NodeType allocationLocalityType; private NodeType requestLocalityType; @@ -57,7 +55,7 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt, SchedulerContainer<A, N> allocatedOrReservedContainer, List<SchedulerContainer<A, N>> toRelease, SchedulerContainer<A, N> allocateFromReservedContainer, - boolean isIncreasedAllocation, NodeType allocationLocalityType, + NodeType allocationLocalityType, NodeType requestLocalityType, SchedulingMode schedulingMode, Resource allocatedResource) { this.allocatedOrReservedContainer = allocatedOrReservedContainer; @@ -65,7 +63,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt, this.toRelease = toRelease; } this.allocateFromReservedContainer = allocateFromReservedContainer; - this.isIncreasedAllocation = isIncreasedAllocation; this.allocationLocalityType = allocationLocalityType; this.requestLocalityType = requestLocalityType; this.schedulingMode = schedulingMode; @@ -84,10 +81,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt, return allocationLocalityType; } - public boolean isIncreasedAllocation() { - return isIncreasedAllocation; - } - public SchedulerContainer<A, N> getAllocateFromReservedContainer() { return allocateFromReservedContainer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 30b7305..fea29bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -312,54 +312,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return false; } - private SchedContainerChangeRequest getResourceChangeRequest( - SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) { - return appSchedulingInfo.getIncreaseRequest( - schedulerContainer.getSchedulerNode().getNodeID(), - schedulerContainer.getSchedulerRequestKey(), - schedulerContainer.getRmContainer().getContainerId()); - } - - private boolean checkIncreaseContainerAllocation( - ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation, - SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) { - // When increase a container - if (schedulerContainer.getRmContainer().getState() - != RMContainerState.RUNNING) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to increase a container, but container=" - + schedulerContainer.getRmContainer().getContainerId() - + " is not in running state."); - } - return false; - } - - // Check if increase request is still valid - SchedContainerChangeRequest increaseRequest = getResourceChangeRequest( - schedulerContainer); - - if (null == increaseRequest || !Resources.equals( - increaseRequest.getDeltaCapacity(), - allocation.getAllocatedOrReservedResource())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Increase request has been changed, reject this proposal"); - } - return false; - } - - if (allocation.getAllocateFromReservedContainer() != null) { - // In addition, if allocation is from a reserved container, check - // if the reserved container has enough reserved space - if (!Resources.equals( - allocation.getAllocateFromReservedContainer().getRmContainer() - .getReservedResource(), increaseRequest.getDeltaCapacity())) { - return false; - } - } - - return true; - } - private boolean commonCheckContainerAllocation( Resource cluster, ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation, @@ -445,30 +397,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { schedulerContainer = allocation.getAllocatedOrReservedContainer(); if (schedulerContainer.isAllocated()) { - if (!allocation.isIncreasedAllocation()) { - // When allocate a new container - resourceRequests = - schedulerContainer.getRmContainer().getResourceRequests(); - - // Check pending resource request - if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(), - schedulerContainer.getSchedulerNode(), - schedulerContainer.getSchedulerRequestKey())) { - if (LOG.isDebugEnabled()) { - LOG.debug("No pending resource for: nodeType=" + allocation - .getAllocationLocalityType() + ", node=" + schedulerContainer - .getSchedulerNode() + ", requestKey=" + schedulerContainer - .getSchedulerRequestKey() + ", application=" - + getApplicationAttemptId()); - } - - return false; - } - } else { - if (!checkIncreaseContainerAllocation(allocation, - schedulerContainer)) { - return false; + // When allocate a new container + resourceRequests = + schedulerContainer.getRmContainer().getResourceRequests(); + + // Check pending resource request + if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getSchedulerRequestKey())) { + if (LOG.isDebugEnabled()) { + LOG.debug("No pending resource for: nodeType=" + allocation + .getAllocationLocalityType() + ", node=" + schedulerContainer + .getSchedulerNode() + ", requestKey=" + schedulerContainer + .getSchedulerRequestKey() + ", application=" + + getApplicationAttemptId()); } + + return false; } // Common part of check container allocation regardless if it is a @@ -541,12 +486,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Generate new containerId if it is not an allocation for increasing // Or re-reservation - if (!allocation.isIncreasedAllocation()) { - if (rmContainer.getContainer().getId() == null) { - rmContainer.setContainerId(BuilderUtils - .newContainerId(getApplicationAttemptId(), - getNewContainerId())); - } + if (rmContainer.getContainer().getId() == null) { + rmContainer.setContainerId(BuilderUtils + .newContainerId(getApplicationAttemptId(), + getNewContainerId())); } ContainerId containerId = rmContainer.getContainerId(); @@ -562,77 +505,50 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { schedulerContainer.getSchedulerNode(), reservedContainer); } - // Update this application for the allocated container - if (!allocation.isIncreasedAllocation()) { - // Allocate a new container - addToNewlyAllocatedContainers( - schedulerContainer.getSchedulerNode(), rmContainer); - liveContainers.put(containerId, rmContainer); + // Allocate a new container + addToNewlyAllocatedContainers( + schedulerContainer.getSchedulerNode(), rmContainer); + liveContainers.put(containerId, rmContainer); - // Deduct pending resource requests - List<ResourceRequest> requests = appSchedulingInfo.allocate( - allocation.getAllocationLocalityType(), - schedulerContainer.getSchedulerNode(), - schedulerContainer.getSchedulerRequestKey(), - schedulerContainer.getRmContainer().getContainer()); - ((RMContainerImpl) rmContainer).setResourceRequests(requests); + // Deduct pending resource requests + List<ResourceRequest> requests = appSchedulingInfo.allocate( + allocation.getAllocationLocalityType(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getRmContainer().getContainer()); + ((RMContainerImpl) rmContainer).setResourceRequests(requests); - attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), - allocation.getAllocatedOrReservedResource()); + attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), + allocation.getAllocatedOrReservedResource()); - rmContainer.handle( - new RMContainerEvent(containerId, RMContainerEventType.START)); + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.START)); - // Inform the node - schedulerContainer.getSchedulerNode().allocateContainer( - rmContainer); + // Inform the node + schedulerContainer.getSchedulerNode().allocateContainer( + rmContainer); - // update locality statistics, - incNumAllocatedContainers(allocation.getAllocationLocalityType(), - allocation.getRequestLocalityType()); + // update locality statistics, + incNumAllocatedContainers(allocation.getAllocationLocalityType(), + allocation.getRequestLocalityType()); - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationAttemptId=" + containerId - .getApplicationAttemptId() + " container=" + containerId - + " host=" + rmContainer.getAllocatedNode().getHost() - + " type=" + allocation.getAllocationLocalityType()); - } - RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, - "SchedulerApp", getApplicationId(), containerId, - allocation.getAllocatedOrReservedResource()); - } else{ - SchedContainerChangeRequest increaseRequest = - getResourceChangeRequest(schedulerContainer); - - // allocate resource for an increase request - // Notify node - schedulerContainer.getSchedulerNode().increaseContainer( - increaseRequest.getContainerId(), - increaseRequest.getDeltaCapacity()); - - // OK, we can allocate this increase request - // Notify application - increaseContainer(increaseRequest); + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + containerId + .getApplicationAttemptId() + " container=" + containerId + + " host=" + rmContainer.getAllocatedNode().getHost() + + " type=" + allocation.getAllocationLocalityType()); } + RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, + "SchedulerApp", getApplicationId(), containerId, + allocation.getAllocatedOrReservedResource()); } else { - if (!allocation.isIncreasedAllocation()) { - // If the rmContainer's state is already updated to RESERVED, this is - // a reReservation - reserve(schedulerContainer.getSchedulerRequestKey(), - schedulerContainer.getSchedulerNode(), - schedulerContainer.getRmContainer(), - schedulerContainer.getRmContainer().getContainer(), - reReservation); - } else{ - SchedContainerChangeRequest increaseRequest = - getResourceChangeRequest(schedulerContainer); - - reserveIncreasedContainer( - schedulerContainer.getSchedulerRequestKey(), - schedulerContainer.getSchedulerNode(), - increaseRequest.getRMContainer(), - increaseRequest.getDeltaCapacity()); - } + // If the rmContainer's state is already updated to RESERVED, this is + // a reReservation + reserve(schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getRmContainer(), + schedulerContainer.getRmContainer().getContainer(), + reReservation); } } } finally { @@ -649,9 +565,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { FiCaSchedulerNode node, RMContainer rmContainer) { try { writeLock.lock(); - // Cancel increase request (if it has reserved increase request - rmContainer.cancelIncreaseReservation(); - // Done with the reservation? if (internalUnreserve(node, schedulerKey)) { node.unreserveResource(this); @@ -807,13 +720,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { .entrySet()) { NodeId nodeId = entry.getKey(); RMContainer reservedContainer = entry.getValue(); - if (reservedContainer.hasIncreaseReservation()) { - // Currently, only regular container allocation supports continuous - // reservation looking, we don't support canceling increase request - // reservation when allocating regular container. - continue; - } - Resource reservedResource = reservedContainer.getReservedResource(); // make sure we unreserve one with at least the same amount of @@ -869,25 +775,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } } - public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey, - FiCaSchedulerNode node, - RMContainer rmContainer, Resource reservedResource) { - // Inform the application - if (super.reserveIncreasedContainer(node, schedulerKey, rmContainer, - reservedResource)) { - - queue.getMetrics().reserveResource(getUser(), reservedResource); - - // Update the node - node.reserveResource(this, schedulerKey, rmContainer); - - // Succeeded - return true; - } - - return false; - } - public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer, Container container, boolean reReservation) { // Update reserved metrics if this is the first reservation @@ -1114,26 +1001,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { toBeRemovedIncRequests.put(request.getContainerId(), request); } - public void removedToBeRemovedIncreaseRequests() { - // Remove invalid in request requests - if (!toBeRemovedIncRequests.isEmpty()) { - try { - writeLock.lock(); - Iterator<Map.Entry<ContainerId, SchedContainerChangeRequest>> iter = - toBeRemovedIncRequests.entrySet().iterator(); - while (iter.hasNext()) { - SchedContainerChangeRequest req = iter.next().getValue(); - appSchedulingInfo.removeIncreaseRequest(req.getNodeId(), - req.getRMContainer().getAllocatedSchedulerKey(), - req.getContainerId()); - iter.remove(); - } - } finally { - writeLock.unlock(); - } - } - } - /* * Overriding to appease findbugs */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 8eac929..c26a11b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -153,20 +153,6 @@ public class FiCaSchedulerNode extends SchedulerNode { } } - @Override - protected synchronized void changeContainerResource(ContainerId containerId, - Resource deltaResource, boolean increase) { - super.changeContainerResource(containerId, deltaResource, increase); - - if (killableContainers.containsKey(containerId)) { - if (increase) { - Resources.addTo(totalKillableResources, deltaResource); - } else { - Resources.subtractFrom(totalKillableResources, deltaResource); - } - } - } - public synchronized Resource getTotalKillableResources() { return totalKillableResources; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3246778..0599414 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -819,9 +819,7 @@ public class FairScheduler extends } // Handle promotions and demotions - handleExecutionTypeUpdates( - application, updateRequests.getPromotionRequests(), - updateRequests.getDemotionRequests()); + handleContainerUpdates(application, updateRequests); // Sanity check normalizeRequests(ask); @@ -1769,13 +1767,6 @@ public class FairScheduler extends return targetQueueName; } - @Override - protected void decreaseContainer( - SchedContainerChangeRequest decreaseRequest, - SchedulerApplicationAttempt attempt) { - // TODO Auto-generated method stub - } - public float getReservableNodesRatio() { return reservableNodesRatio; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 64dbc7d..a8d4f48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -937,14 +937,6 @@ public class FifoScheduler extends } @Override - protected void decreaseContainer( - SchedContainerChangeRequest decreaseRequest, - SchedulerApplicationAttempt attempt) { - // TODO Auto-generated method stub - - } - - @Override protected synchronized void nodeUpdate(RMNode nm) { super.nodeUpdate(nm); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 899523c..e34665d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -137,11 +137,11 @@ public class TestChildQueueOrder { final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { ((ParentQueue)queue).allocateResource(clusterResource, - allocatedResource, RMNodeLabelsManager.NO_LABEL, false); + allocatedResource, RMNodeLabelsManager.NO_LABEL); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource, null, null, false); + allocatedResource, null, null); } // Next call - nothing http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 0696f57..b4b05ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -88,18 +88,6 @@ public class TestContainerResizing { } @Override - protected void decreaseContainers( - List<UpdateContainerRequest> decreaseRequests, - SchedulerApplicationAttempt attempt) { - try { - Thread.sleep(1000); - } catch(InterruptedException e) { - LOG.debug("Thread interrupted."); - } - super.decreaseContainers(decreaseRequests, attempt); - } - - @Override public CSAssignment allocateContainersToNode( PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) { try { @@ -288,13 +276,9 @@ public class TestContainerResizing { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - - RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); /* Check reservation statuses */ // Increase request should be reserved - Assert.assertTrue(rmContainer1.hasIncreaseReservation()); - Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize()); Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied @@ -319,7 +303,6 @@ public class TestContainerResizing { /* Check statuses after reservation satisfied */ // Increase request should be unreserved - Assert.assertFalse(rmContainer1.hasIncreaseReservation()); Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will be changed since it's satisfied @@ -391,11 +374,8 @@ public class TestContainerResizing { RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId2); - /* Check reservation statuses */ // Increase request should *NOT* be reserved as it exceeds user limit - Assert.assertFalse(rmContainer1.hasIncreaseReservation()); Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied @@ -471,13 +451,9 @@ public class TestContainerResizing { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - - RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); /* Check reservation statuses */ // Increase request should be reserved - Assert.assertTrue(rmContainer1.hasIncreaseReservation()); - Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize()); Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied @@ -510,7 +486,6 @@ public class TestContainerResizing { // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); - Assert.assertFalse(rmContainer1.hasIncreaseReservation()); // Pending resource will be changed since it's satisfied checkPendingResource(rm1, "default", 0 * GB, null); Assert.assertEquals(0 * GB, @@ -585,13 +560,9 @@ public class TestContainerResizing { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - - RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); /* Check reservation statuses */ // Increase request should be reserved - Assert.assertTrue(rmContainer1.hasIncreaseReservation()); - Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize()); Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied @@ -614,7 +585,7 @@ public class TestContainerResizing { am1.sendContainerResizingRequest(Arrays.asList( UpdateContainerRequest .newInstance(0, containerId1, - ContainerUpdateType.INCREASE_RESOURCE, + ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(1 * GB), null))); // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); @@ -623,7 +594,6 @@ public class TestContainerResizing { // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); - Assert.assertFalse(rmContainer1.hasIncreaseReservation()); // Pending resource will be changed since it's satisfied checkPendingResource(rm1, "default", 0 * GB, null); Assert.assertEquals(0 * GB, @@ -698,12 +668,8 @@ public class TestContainerResizing { RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2); - /* Check reservation statuses */ // Increase request should be reserved - Assert.assertTrue(rmContainer2.hasIncreaseReservation()); - Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemorySize()); Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied @@ -721,12 +687,13 @@ public class TestContainerResizing { // Complete container2, container will be unreserved and completed am1.allocate(null, Arrays.asList(containerId2)); - + + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + am1.allocate(null, null); /* Check statuses after reservation satisfied */ // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); - Assert.assertFalse(rmContainer2.hasIncreaseReservation()); // Pending resource will be changed since it's satisfied checkPendingResource(rm1, "default", 0 * GB, null); Assert.assertEquals(0 * GB, @@ -796,12 +763,8 @@ public class TestContainerResizing { RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2); - /* Check reservation statuses */ // Increase request should be reserved - Assert.assertTrue(rmContainer2.hasIncreaseReservation()); - Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemorySize()); Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied @@ -825,11 +788,11 @@ public class TestContainerResizing { // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); - Assert.assertFalse(rmContainer2.hasIncreaseReservation()); // Pending resource will be changed since it's satisfied + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + checkPendingResource(rm1, "default", 0 * GB, null); - Assert.assertEquals(0 * GB, - app.getAppAttemptResourceUsage().getPending().getMemorySize()); + // Queue/user/application's usage will be updated checkUsedResource(rm1, "default", 0 * GB, null); // User will be removed @@ -949,89 +912,6 @@ public class TestContainerResizing { rm1.close(); } - @Test - public void testIncreaseContainerRequestGetPreferrence() - throws Exception { - /** - * There're multiple containers need to be increased, and there're several - * container allocation request, scheduler will try to increase container - * before allocate new containers - */ - MockRM rm1 = new MockRM() { - @Override - public RMNodeLabelsManager createNodeLabelManager() { - return mgr; - } - }; - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); - - // app1 -> a1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( - rm1, app1.getApplicationId()); - ApplicationAttemptId attemptId = am1.getApplicationAttemptId(); - - // Container 2, 3 (priority=3) - allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2); - - // Container 4, 5 (priority=2) - allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4); - - // Container 6, 7 (priority=4) - allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6); - - // am1 asks to change its container[2-7] from 1G to 2G - List<UpdateContainerRequest> increaseRequests = new ArrayList<>(); - for (int cId = 2; cId <= 7; cId++) { - ContainerId containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), cId); - increaseRequests.add(UpdateContainerRequest - .newInstance(0, containerId, - ContainerUpdateType.INCREASE_RESOURCE, - Resources.createResource(2 * GB), null)); - } - am1.sendContainerResizingRequest(increaseRequests); - - checkPendingResource(rm1, "default", 6 * GB, null); - Assert.assertEquals(6 * GB, - app.getAppAttemptResourceUsage().getPending().getMemorySize()); - - // Get rmNode1 - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - // assignContainer, container-4/5/2 increased (which has highest priority OR - // earlier allocated) - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - AllocateResponse allocateResponse = am1.allocate(null, null); - Assert.assertEquals(3, allocateResponse.getUpdatedContainers().size()); - verifyContainerIncreased(allocateResponse, - ContainerId.newContainerId(attemptId, 4), 2 * GB); - verifyContainerIncreased(allocateResponse, - ContainerId.newContainerId(attemptId, 5), 2 * GB); - verifyContainerIncreased(allocateResponse, - ContainerId.newContainerId(attemptId, 2), 2 * GB); - - /* Check statuses after allocation */ - // There're still 3 pending increase requests - checkPendingResource(rm1, "default", 3 * GB, null); - Assert.assertEquals(3 * GB, - app.getAppAttemptResourceUsage().getPending().getMemorySize()); - // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 10 * GB, null); - Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default")) - .getUser("user").getUsed().getMemorySize()); - Assert.assertEquals(0 * GB, - app.getAppAttemptResourceUsage().getReserved().getMemorySize()); - Assert.assertEquals(10 * GB, - app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); - } - @Test (timeout = 60000) public void testDecreaseContainerWillNotDeadlockContainerAllocation() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index 74cecf2..184e854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -200,6 +200,7 @@ public class TestIncreaseAllocationExpirer { // back action to complete Thread.sleep(10000); // Verify container size is 1G + am1.allocate(null, null); Assert.assertEquals( 1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) .getAllocatedResource().getMemorySize()); @@ -304,6 +305,8 @@ public class TestIncreaseAllocationExpirer { // Wait long enough for the second token (5G) to expire, and verify that // the roll back action is completed as expected Thread.sleep(10000); + am1.allocate(null, null); + Thread.sleep(2000); // Verify container size is rolled back to 3G Assert.assertEquals( 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) @@ -400,13 +403,13 @@ public class TestIncreaseAllocationExpirer { // Decrease containers List<UpdateContainerRequest> decreaseRequests = new ArrayList<>(); decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId2, - ContainerUpdateType.INCREASE_RESOURCE, + ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(2 * GB), null)); decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId3, - ContainerUpdateType.INCREASE_RESOURCE, + ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(4 * GB), null)); decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId4, - ContainerUpdateType.INCREASE_RESOURCE, + ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(4 * GB), null)); AllocateResponse response = am1.sendContainerResizingRequest(decreaseRequests); @@ -418,6 +421,9 @@ public class TestIncreaseAllocationExpirer { rm1, containerId4, Resources.createResource(6 * GB))); // Wait for containerId3 token to expire, Thread.sleep(10000); + + am1.allocate(null, null); + Assert.assertEquals( 2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) .getAllocatedResource().getMemorySize()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index ec1b84d..3fbbae3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1061,11 +1061,11 @@ public class TestLeafQueue { qb.releaseResource(clusterResource, app_0, app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey) .getPerAllocationResource(), - null, null, false); + null, null); qb.releaseResource(clusterResource, app_2, app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey) .getPerAllocationResource(), - null, null, false); + null, null); qb.setUserLimit(50); qb.setUserLimitFactor(1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 11fea82..c4b7a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -171,11 +171,11 @@ public class TestParentQueue { final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { ((ParentQueue)queue).allocateResource(clusterResource, - allocatedResource, RMNodeLabelsManager.NO_LABEL, false); + allocatedResource, RMNodeLabelsManager.NO_LABEL); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource, null, null, false); + allocatedResource, null, null); } // Next call - nothing --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org