http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java new file mode 100644 index 0000000..7381250 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class encapsulates all outstanding container increase and decrease + * requests for an application. + */ +public class ContainerUpdateContext { + + public static final ContainerId UNDEFINED = + ContainerId.newContainerId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(-1, -1), -1), -1); + protected static final RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + + // Keep track of containers that are undergoing promotion + private final Map<SchedulerRequestKey, Map<Resource, + Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>(); + + private final Set<ContainerId> outstandingDecreases = new HashSet<>(); + private final AppSchedulingInfo appSchedulingInfo; + + ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) { + this.appSchedulingInfo = appSchedulingInfo; + } + + private synchronized boolean isBeingIncreased(Container container) { + Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap = + outstandingIncreases.get( + new SchedulerRequestKey(container.getPriority(), + container.getAllocationRequestId(), container.getId())); + if (resourceMap != null) { + Map<NodeId, Set<ContainerId>> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set<ContainerId> containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + return containerIds.contains(container.getId()); + } + } + } + return false; + } + + /** + * Add the container to outstanding decreases. + * @param container Container. + * @return true if updated to outstanding decreases was successful. + */ + public synchronized boolean checkAndAddToOutstandingDecreases( + Container container) { + if (isBeingIncreased(container) + || outstandingDecreases.contains(container.getId())) { + return false; + } + outstandingDecreases.add(container.getId()); + return true; + } + + /** + * Add the container to outstanding increases. + * @param rmContainer RMContainer. + * @param schedulerNode SchedulerNode. + * @param updateRequest UpdateContainerRequest. + * @return true if updated to outstanding increases was successful. + */ + public synchronized boolean checkAndAddToOutstandingIncreases( + RMContainer rmContainer, SchedulerNode schedulerNode, + UpdateContainerRequest updateRequest) { + Container container = rmContainer.getContainer(); + SchedulerRequestKey schedulerKey = + SchedulerRequestKey.create(updateRequest, + rmContainer.getAllocatedSchedulerKey()); + Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap = + outstandingIncreases.get(schedulerKey); + if (resourceMap == null) { + resourceMap = new HashMap<>(); + outstandingIncreases.put(schedulerKey, resourceMap); + } + Map<NodeId, Set<ContainerId>> locationMap = + resourceMap.get(container.getResource()); + if (locationMap == null) { + locationMap = new HashMap<>(); + resourceMap.put(container.getResource(), locationMap); + } + Set<ContainerId> containerIds = locationMap.get(container.getNodeId()); + if (containerIds == null) { + containerIds = new HashSet<>(); + locationMap.put(container.getNodeId(), containerIds); + } + if (containerIds.contains(container.getId()) + || outstandingDecreases.contains(container.getId())) { + return false; + } + containerIds.add(container.getId()); + + Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs = + new HashMap<>(); + Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer); + Map<String, ResourceRequest> resMap = + createResourceRequests(rmContainer, schedulerNode, + schedulerKey, resToIncrease); + updateResReqs.put(schedulerKey, resMap); + appSchedulingInfo.addToPlacementSets(false, updateResReqs); + return true; + } + + private Map<String, ResourceRequest> createResourceRequests( + RMContainer rmContainer, SchedulerNode schedulerNode, + SchedulerRequestKey schedulerKey, Resource resToIncrease) { + Map<String, ResourceRequest> resMap = new HashMap<>(); + resMap.put(rmContainer.getContainer().getNodeId().getHost(), + createResourceReqForIncrease(schedulerKey, resToIncrease, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, rmContainer.getContainer().getNodeId().getHost())); + resMap.put(schedulerNode.getRackName(), + createResourceReqForIncrease(schedulerKey, resToIncrease, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, schedulerNode.getRackName())); + resMap.put(ResourceRequest.ANY, + createResourceReqForIncrease(schedulerKey, resToIncrease, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, ResourceRequest.ANY)); + return resMap; + } + + private Resource getResourceToIncrease(UpdateContainerRequest updateReq, + RMContainer rmContainer) { + if (updateReq.getContainerUpdateType() == + ContainerUpdateType.PROMOTE_EXECUTION_TYPE) { + return rmContainer.getContainer().getResource(); + } + // TODO: Fix this for container increase.. + // This has to equal the Resources in excess of fitsIn() + // for container increase and is equal to the container total + // resource for Promotion. + return null; + } + + private static ResourceRequest createResourceReqForIncrease( + SchedulerRequestKey schedulerRequestKey, Resource resToIncrease, + ResourceRequest rr, RMContainer rmContainer, String resourceName) { + rr.setResourceName(resourceName); + rr.setNumContainers(1); + rr.setRelaxLocality(false); + rr.setPriority(rmContainer.getContainer().getPriority()); + rr.setAllocationRequestId(schedulerRequestKey.getAllocationRequestId()); + rr.setCapability(resToIncrease); + rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression()); + rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true)); + return rr; + } + + /** + * Remove Container from outstanding increases / decreases. Calling this + * method essentially completes the update process. + * @param schedulerKey SchedulerRequestKey. + * @param container Container. + */ + public synchronized void removeFromOutstandingUpdate( + SchedulerRequestKey schedulerKey, Container container) { + Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap = + outstandingIncreases.get(schedulerKey); + if (resourceMap != null) { + Map<NodeId, Set<ContainerId>> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set<ContainerId> containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + containerIds.remove(container.getId()); + if (containerIds.isEmpty()) { + locationMap.remove(container.getNodeId()); + } + } + if (locationMap.isEmpty()) { + resourceMap.remove(container.getResource()); + } + } + if (resourceMap.isEmpty()) { + outstandingIncreases.remove(schedulerKey); + } + } + outstandingDecreases.remove(container.getId()); + } + + /** + * Check if a new container is to be matched up against an outstanding + * Container increase request. + * @param schedulerKey SchedulerRequestKey. + * @param rmContainer RMContainer. + * @return ContainerId. + */ + public ContainerId matchContainerToOutstandingIncreaseReq( + SchedulerNode node, SchedulerRequestKey schedulerKey, + RMContainer rmContainer) { + ContainerId retVal = null; + Container container = rmContainer.getContainer(); + Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap = + outstandingIncreases.get(schedulerKey); + if (resourceMap != null) { + Map<NodeId, Set<ContainerId>> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set<ContainerId> containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + retVal = containerIds.iterator().next(); + } + } + } + // Allocation happened on NM on the same host, but not on the NM + // we need.. We need to signal that this container has to be released. + // We also need to add these requests back.. to be reallocated. + if (resourceMap != null && retVal == null) { + Map<SchedulerRequestKey, Map<String, ResourceRequest>> reqsToUpdate = + new HashMap<>(); + Map<String, ResourceRequest> resMap = createResourceRequests + (rmContainer, node, schedulerKey, + rmContainer.getContainer().getResource()); + reqsToUpdate.put(schedulerKey, resMap); + appSchedulingInfo.addToPlacementSets(true, reqsToUpdate); + return UNDEFINED; + } + return retVal; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdates.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdates.java new file mode 100644 index 0000000..77b1545 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdates.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; + +import java.util.ArrayList; +import java.util.List; + +/** + * Holder class that maintains list of container update requests + */ +public class ContainerUpdates { + + final List<UpdateContainerRequest> increaseRequests = new ArrayList<>(); + final List<UpdateContainerRequest> decreaseRequests = new ArrayList<>(); + final List<UpdateContainerRequest> promotionRequests = new ArrayList<>(); + final List<UpdateContainerRequest> demotionRequests = new ArrayList<>(); + + /** + * Returns Container Increase Requests. + * @return Container Increase Requests. + */ + public List<UpdateContainerRequest> getIncreaseRequests() { + return increaseRequests; + } + + /** + * Returns Container Decrease Requests. + * @return Container Decrease Requests. + */ + public List<UpdateContainerRequest> getDecreaseRequests() { + return decreaseRequests; + } + + /** + * Returns Container Promotion Requests. + * @return Container Promotion Requests. + */ + public List<UpdateContainerRequest> getPromotionRequests() { + return promotionRequests; + } + + /** + * Returns Container Demotion Requests. + * @return Container Demotion Requests. + */ + public List<UpdateContainerRequest> getDemotionRequests() { + return demotionRequests; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index e94d800..4a8b2da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NMToken; @@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -133,10 +136,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>(); + protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>(); + protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>(); + protected List<RMContainer> tempContainerToKill = new ArrayList<>(); protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>(); protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>(); protected Set<NMToken> updatedNMTokens = new HashSet<>(); + protected List<UpdateContainerError> updateContainerErrors = new ArrayList<>(); + // This pendingRelease is used in work-preserving recovery scenario to keep // track of the AM's outstanding release requests. RM on recovery could // receive the release request form AM before it receives the container status @@ -247,6 +255,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return this.appSchedulingInfo; } + public ContainerUpdateContext getUpdateContext() { + return this.appSchedulingInfo.getUpdateContext(); + } + /** * Is this application pending? * @return true if it is else false. @@ -537,8 +549,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { writeLock.lock(); // Create RMContainer if necessary if (rmContainer == null) { - rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + rmContainer = new RMContainerImpl(container, schedulerKey, + getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), rmContext); } if (rmContainer.getState() == RMContainerState.NEW) { attemptResourceUsage.incReserved(node.getPartition(), @@ -635,10 +648,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } private Container updateContainerAndNMToken(RMContainer rmContainer, - boolean newContainer, boolean increasedContainer) { + ContainerUpdateType updateType) { Container container = rmContainer.getContainer(); ContainerType containerType = ContainerType.TASK; - if (!newContainer) { + if (updateType != null) { container.setVersion(container.getVersion() + 1); } // The working knowledge is that masterContainer for AM is null as it @@ -662,12 +675,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return null; } - if (newContainer) { + if (updateType == null || + ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType || + ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) { rmContainer.handle(new RMContainerEvent( rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); } else { rmContainer.handle(new RMContainerUpdatesAcquiredEvent( - rmContainer.getContainerId(), increasedContainer)); + rmContainer.getContainerId(), + ContainerUpdateType.INCREASE_RESOURCE == updateType)); } return container; } @@ -699,8 +715,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); while (i.hasNext()) { RMContainer rmContainer = i.next(); - Container updatedContainer = updateContainerAndNMToken(rmContainer, - true, false); + Container updatedContainer = + updateContainerAndNMToken(rmContainer, null); // Only add container to return list when it's not null. // updatedContainer could be null when generate token failed, it can be // caused by DNS resolving failed. @@ -713,9 +729,142 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } finally { writeLock.unlock(); } + } + public void addToNewlyDemotedContainers(ContainerId containerId, + RMContainer rmContainer) { + newlyDemotedContainers.put(containerId, rmContainer); } - + + protected synchronized void addToUpdateContainerErrors( + UpdateContainerError error) { + updateContainerErrors.add(error); + } + + protected synchronized void addToNewlyAllocatedContainers( + SchedulerNode node, RMContainer rmContainer) { + if (oppContainerContext == null) { + newlyAllocatedContainers.add(rmContainer); + return; + } + ContainerId matchedContainerId = + getUpdateContext().matchContainerToOutstandingIncreaseReq( + node, rmContainer.getAllocatedSchedulerKey(), rmContainer); + if (matchedContainerId != null) { + if (ContainerUpdateContext.UNDEFINED == matchedContainerId) { + // This is a spurious allocation (relaxLocality = false + // resulted in the Container being allocated on an NM on the same host + // but not on the NM running the container to be updated. Can + // happen if more than one NM exists on the same host.. usually + // occurs when using MiniYARNCluster to test). + tempContainerToKill.add(rmContainer); + } else { + newlyPromotedContainers.put(matchedContainerId, rmContainer); + } + } else { + newlyAllocatedContainers.add(rmContainer); + } + } + + public List<Container> pullNewlyPromotedContainers() { + return pullContainersWithUpdatedExecType(newlyPromotedContainers, + ContainerUpdateType.PROMOTE_EXECUTION_TYPE); + } + + public List<Container> pullNewlyDemotedContainers() { + return pullContainersWithUpdatedExecType(newlyDemotedContainers, + ContainerUpdateType.DEMOTE_EXECUTION_TYPE); + } + + public List<UpdateContainerError> pullUpdateContainerErrors() { + List<UpdateContainerError> errors = + new ArrayList<>(updateContainerErrors); + updateContainerErrors.clear(); + return errors; + } + + /** + * A container is promoted if its executionType is changed from + * OPPORTUNISTIC to GUARANTEED. It id demoted if the change is from + * GUARANTEED to OPPORTUNISTIC. + * @return Newly Promoted and Demoted containers + */ + private List<Container> pullContainersWithUpdatedExecType( + Map<ContainerId, RMContainer> newlyUpdatedContainers, + ContainerUpdateType updateTpe) { + List<Container> updatedContainers = new ArrayList<>(); + if (oppContainerContext == null) { + return updatedContainers; + } + try { + writeLock.lock(); + Iterator<Map.Entry<ContainerId, RMContainer>> i = + newlyUpdatedContainers.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<ContainerId, RMContainer> entry = i.next(); + ContainerId matchedContainerId = entry.getKey(); + RMContainer rmContainer = entry.getValue(); + + // swap containers + RMContainer existingRMContainer = swapContainer( + rmContainer, matchedContainerId); + getUpdateContext().removeFromOutstandingUpdate( + rmContainer.getAllocatedSchedulerKey(), + existingRMContainer.getContainer()); + Container updatedContainer = updateContainerAndNMToken( + existingRMContainer, updateTpe); + updatedContainers.add(updatedContainer); + + tempContainerToKill.add(rmContainer); + i.remove(); + } + // Release all temporary containers + Iterator<RMContainer> tempIter = tempContainerToKill.iterator(); + while (tempIter.hasNext()) { + RMContainer c = tempIter.next(); + // Mark container for release (set RRs to null, so RM does not think + // it is a recoverable container) + ((RMContainerImpl) c).setResourceRequests(null); + ((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c, + SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(), + SchedulerUtils.UPDATED_CONTAINER), + RMContainerEventType.KILL); + tempIter.remove(); + } + return updatedContainers; + } finally { + writeLock.unlock(); + } + } + + private RMContainer swapContainer(RMContainer rmContainer, ContainerId + matchedContainerId) { + RMContainer existingRMContainer = + getRMContainer(matchedContainerId); + if (existingRMContainer != null) { + // Swap updated container with the existing container + Container updatedContainer = rmContainer.getContainer(); + + Container newContainer = Container.newInstance(matchedContainerId, + existingRMContainer.getContainer().getNodeId(), + existingRMContainer.getContainer().getNodeHttpAddress(), + updatedContainer.getResource(), + existingRMContainer.getContainer().getPriority(), null, + updatedContainer.getExecutionType()); + newContainer.setAllocationRequestId( + existingRMContainer.getContainer().getAllocationRequestId()); + newContainer.setVersion(existingRMContainer.getContainer().getVersion()); + + rmContainer.getContainer().setResource( + existingRMContainer.getContainer().getResource()); + rmContainer.getContainer().setExecutionType( + existingRMContainer.getContainer().getExecutionType()); + + ((RMContainerImpl)existingRMContainer).setContainer(newContainer); + } + return existingRMContainer; + } + private List<Container> pullNewlyUpdatedContainers( Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) { try { @@ -728,7 +877,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { while (i.hasNext()) { RMContainer rmContainer = i.next().getValue(); Container updatedContainer = updateContainerAndNMToken(rmContainer, - false, increase); + increase ? ContainerUpdateType.INCREASE_RESOURCE : + ContainerUpdateType.DECREASE_RESOURCE); if (updatedContainer != null) { returnContainerList.add(updatedContainer); i.remove(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index b227523..5360665 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -41,7 +42,10 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -57,6 +61,9 @@ public class SchedulerUtils { public static final String RELEASED_CONTAINER = "Container released by application"; + + public static final String UPDATED_CONTAINER = + "Temporary container killed by application for ExeutionType update"; public static final String LOST_CONTAINER = "Container released on a *lost* node"; @@ -376,4 +383,19 @@ public class SchedulerUtils { } return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster); } + + public static RMContainer createOpportunisticRmContainer(RMContext rmContext, + Container container, boolean isRemotelyAllocated) { + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getCurrentAttemptForContainer(container.getId()); + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), + appAttempt.getApplicationAttemptId(), container.getNodeId(), + appAttempt.getUser(), rmContext, isRemotelyAllocated); + appAttempt.addRMContainer(container.getId(), rmContainer); + ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( + container.getNodeId()).allocateContainer(rmContainer); + return rmContainer; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index ea1ae60..3185dc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; @@ -42,8 +43,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; -import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -137,8 +136,7 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { * @param release * @param blacklistAdditions * @param blacklistRemovals - * @param increaseRequests - * @param decreaseRequests + * @param updateRequests * @return the {@link Allocation} for the application */ @Public @@ -146,8 +144,7 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { Allocation allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<UpdateContainerRequest> increaseRequests, - List<UpdateContainerRequest> decreaseRequests); + ContainerUpdates updateRequests); /** * Get node resource usage report. http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index b862850..55ffe25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -90,6 +90,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContai import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -921,22 +924,27 @@ public class CapacityScheduler extends public Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<UpdateContainerRequest> increaseRequests, - List<UpdateContainerRequest> decreaseRequests) { + ContainerUpdates updateRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { return EMPTY_ALLOCATION; } + // Handle promotions and demotions + handleExecutionTypeUpdates( + application, updateRequests.getPromotionRequests(), + updateRequests.getDemotionRequests()); + // Release containers releaseContainers(release, application); // update increase requests - LeafQueue updateDemandForQueue = updateIncreaseRequests(increaseRequests, + LeafQueue updateDemandForQueue = + updateIncreaseRequests(updateRequests.getIncreaseRequests(), application); // Decrease containers - decreaseContainers(decreaseRequests, application); + decreaseContainers(updateRequests.getDecreaseRequests(), application); // Sanity check for new allocation requests normalizeRequests(ask); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 1eb48bb..eeb0815 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -746,7 +746,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // When reserving container RMContainer updatedContainer = reservedContainer; if (updatedContainer == null) { - updatedContainer = new RMContainerImpl(container, + updatedContainer = new RMContainerImpl(container, schedulerKey, application.getApplicationAttemptId(), node.getNodeID(), application.getAppSchedulingInfo().getUser(), rmContext); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index b14bc20..809446f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -222,7 +222,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, + RMContainer rmContainer = new RMContainerImpl(container, schedulerKey, this.getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, request.getNodeLabelExpression()); @@ -554,12 +554,14 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Update this application for the allocated container if (!allocation.isIncreasedAllocation()) { // Allocate a new container - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers( + schedulerContainer.getSchedulerNode(), rmContainer); liveContainers.put(containerId, rmContainer); // Deduct pending resource requests List<ResourceRequest> requests = appSchedulingInfo.allocate( - allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(), + allocation.getAllocationLocalityType(), + schedulerContainer.getSchedulerNode(), schedulerContainer.getSchedulerRequestKey(), schedulerContainer.getRmContainer().getContainer()); ((RMContainerImpl) rmContainer).setResourceRequests(requests); @@ -751,12 +753,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers(); List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers(); List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers(); + List<Container> newlyPromotedContainers = pullNewlyPromotedContainers(); + List<Container> newlyDemotedContainers = pullNewlyDemotedContainers(); List<NMToken> updatedNMTokens = pullUpdatedNMTokens(); Resource headroom = getHeadroom(); setApplicationHeadroomForMetrics(headroom); return new Allocation(newlyAllocatedContainers, headroom, null, currentContPreemption, Collections.singletonList(rr), updatedNMTokens, - newlyIncreasedContainers, newlyDecreasedContainers); + newlyIncreasedContainers, newlyDecreasedContainers, + newlyPromotedContainers, newlyDemotedContainers); } finally { writeLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index a9591a5..0daa8a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -448,13 +448,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } // Create RMContainer - rmContainer = new RMContainerImpl(container, + rmContainer = new RMContainerImpl(container, schedulerKey, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers(node, rmContainer); liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e3af150..78dfe61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -18,18 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -51,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -80,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -102,8 +92,17 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * A scheduler that schedules resources between a set of queues. The scheduler @@ -812,8 +811,7 @@ public class FairScheduler extends public Allocation allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<UpdateContainerRequest> increaseRequests, - List<UpdateContainerRequest> decreaseRequests) { + ContainerUpdates updateRequests) { // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); @@ -823,6 +821,11 @@ public class FairScheduler extends return EMPTY_ALLOCATION; } + // Handle promotions and demotions + handleExecutionTypeUpdates( + application, updateRequests.getPromotionRequests(), + updateRequests.getDemotionRequests()); + // Sanity check normalizeRequests(ask); @@ -879,7 +882,9 @@ public class FairScheduler extends application.setApplicationHeadroomForMetrics(headroom); return new Allocation(newlyAllocatedContainers, headroom, preemptionContainerIds, null, null, - application.pullUpdatedNMTokens()); + application.pullUpdatedNMTokens(), null, null, + application.pullNewlyPromotedContainers(), + application.pullNewlyDemotedContainers()); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index e60f70e..fa61710 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -68,7 +68,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp { // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, - this.getApplicationAttemptId(), node.getNodeID(), + schedulerKey, this.getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, request.getNodeLabelExpression()); ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); @@ -76,7 +76,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp { updateAMContainerDiagnostics(AMState.ASSIGNED, null); // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers(node, rmContainer); ContainerId containerId = container.getId(); liveContainers.put(containerId, rmContainer); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 643199c..f4ab9c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -71,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -326,8 +326,7 @@ public class FifoScheduler extends public Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<UpdateContainerRequest> increaseRequests, - List<UpdateContainerRequest> decreaseRequests) { + ContainerUpdates updateRequests) { FifoAppAttempt application = getApplicationAttempt(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java index 9dbf024..157518e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java @@ -157,7 +157,8 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> return resourceRequestMap.get(resourceName); } - private void decrementOutstanding(ResourceRequest offSwitchRequest) { + private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey, + ResourceRequest offSwitchRequest) { int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; // Do not remove ANY @@ -166,8 +167,6 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> // Do we have any outstanding requests? // If there is nothing, we need to deactivate this application if (numOffSwitchContainers == 0) { - SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create( - offSwitchRequest); appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey); appSchedulingInfo.checkForDeactivation(); } @@ -177,11 +176,15 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> offSwitchRequest.getCapability()); } - private ResourceRequest cloneResourceRequest(ResourceRequest request) { - ResourceRequest newRequest = - ResourceRequest.newInstance(request.getPriority(), - request.getResourceName(), request.getCapability(), 1, - request.getRelaxLocality(), request.getNodeLabelExpression()); + public ResourceRequest cloneResourceRequest(ResourceRequest request) { + ResourceRequest newRequest = ResourceRequest.newBuilder() + .priority(request.getPriority()) + .allocationRequestId(request.getAllocationRequestId()) + .resourceName(request.getResourceName()) + .capability(request.getCapability()) + .numContainers(1) + .relaxLocality(request.getRelaxLocality()) + .nodeLabelExpression(request.getNodeLabelExpression()).build(); return newRequest; } @@ -189,15 +192,15 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> * The {@link ResourceScheduler} is allocating data-local resources to the * application. */ - private void allocateRackLocal(SchedulerNode node, - ResourceRequest rackLocalRequest, + private void allocateRackLocal(SchedulerRequestKey schedulerKey, + SchedulerNode node, ResourceRequest rackLocalRequest, List<ResourceRequest> resourceRequests) { // Update future requirements decResourceRequest(node.getRackName(), rackLocalRequest); ResourceRequest offRackRequest = resourceRequestMap.get( ResourceRequest.ANY); - decrementOutstanding(offRackRequest); + decrementOutstanding(schedulerKey, offRackRequest); // Update cloned RackLocal and OffRack requests for recovery resourceRequests.add(cloneResourceRequest(rackLocalRequest)); @@ -208,10 +211,11 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> * The {@link ResourceScheduler} is allocating data-local resources to the * application. */ - private void allocateOffSwitch(ResourceRequest offSwitchRequest, + private void allocateOffSwitch(SchedulerRequestKey schedulerKey, + ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) { // Update future requirements - decrementOutstanding(offSwitchRequest); + decrementOutstanding(schedulerKey, offSwitchRequest); // Update cloned OffRack requests for recovery resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } @@ -221,8 +225,8 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> * The {@link ResourceScheduler} is allocating data-local resources to the * application. */ - private void allocateNodeLocal(SchedulerNode node, - ResourceRequest nodeLocalRequest, + private void allocateNodeLocal(SchedulerRequestKey schedulerKey, + SchedulerNode node, ResourceRequest nodeLocalRequest, List<ResourceRequest> resourceRequests) { // Update future requirements decResourceRequest(node.getNodeName(), nodeLocalRequest); @@ -233,7 +237,7 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> ResourceRequest offRackRequest = resourceRequestMap.get( ResourceRequest.ANY); - decrementOutstanding(offRackRequest); + decrementOutstanding(schedulerKey, offRackRequest); // Update cloned NodeLocal, RackLocal and OffRack requests for recovery resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); @@ -278,8 +282,8 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> } @Override - public List<ResourceRequest> allocate(NodeType type, SchedulerNode node, - ResourceRequest request) { + public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node, ResourceRequest request) { try { writeLock.lock(); @@ -296,11 +300,11 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode> } if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, request, resourceRequests); + allocateNodeLocal(schedulerKey, node, request, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, request, resourceRequests); + allocateRackLocal(schedulerKey, node, request, resourceRequests); } else{ - allocateOffSwitch(request, resourceRequests); + allocateOffSwitch(schedulerKey, request, resourceRequests); } return resourceRequests; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java index cdb7c04..3cf5fa2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java @@ -78,13 +78,14 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> { /** * Notify container allocated. + * @param schedulerKey SchedulerRequestKey for this ResourceRequest * @param type Type of the allocation * @param node Which node this container allocated on * @param request Which resource request to allocate * @return list of ResourceRequests deducted */ - List<ResourceRequest> allocate(NodeType type, SchedulerNode node, - ResourceRequest request); + List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node, ResourceRequest request); /** * We can still have pending requirement for a given NodeType and node http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index 3288d39..da1bc2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -331,7 +332,8 @@ public class Application { // Get resources from the ResourceManager Allocation allocation = resourceManager.getResourceScheduler().allocate( applicationAttemptId, new ArrayList<ResourceRequest>(ask), - new ArrayList<ContainerId>(), null, null, null, null); + new ArrayList<ContainerId>(), null, null, + new ContainerUpdates()); if (LOG.isInfoEnabled()) { LOG.info("-=======" + applicationAttemptId + System.lineSeparator() + http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 593de08..fbeca7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -251,6 +251,13 @@ public class MockAM { return allocate(req); } + public AllocateResponse sendContainerUpdateRequest( + List<UpdateContainerRequest> updateRequests) throws Exception { + final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null, + null, updateRequests); + return allocate(req); + } + public AllocateResponse allocate(AllocateRequest allocateRequest) throws Exception { UserGroupInformation ugi = http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 32cdb1b..2d76127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -195,6 +195,12 @@ public class MockNM { isHealthy, resId); } + public NodeHeartbeatResponse nodeHeartbeat( + List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception { + return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(), + isHealthy, ++responseId); + } + public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats, List<Container> increasedConts, boolean isHealthy, int resId) throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index cb57f39..c135384 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -142,6 +142,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; @@ -1072,7 +1074,8 @@ public class TestClientRMService { Container container = Container.newInstance( ContainerId.newContainerId(attemptId, 1), null, "", null, null, null); RMContainerImpl containerimpl = spy(new RMContainerImpl(container, - attemptId, null, "", rmContext)); + SchedulerRequestKey.extractFrom(container), attemptId, null, "", + rmContext)); Map<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<ApplicationAttemptId, RMAppAttempt>(); attempts.put(attemptId, rmAppAttemptImpl); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org