YARN-5959. RM changes to support change of container ExecutionType. (Arun Suresh via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a55bd84 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a55bd84 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a55bd84 Branch: refs/heads/HADOOP-13345 Commit: 0a55bd841ec0f2eb89a0383f4c589526e8b138d4 Parents: a605ff3 Author: Wangda Tan <wan...@apache.org> Authored: Thu Jan 5 10:31:05 2017 -0800 Committer: Wangda Tan <wan...@apache.org> Committed: Thu Jan 5 10:31:05 2017 -0800 ---------------------------------------------------------------------- .../v2/app/rm/TestRMContainerAllocator.java | 12 +- .../sls/scheduler/ResourceSchedulerWrapper.java | 9 +- .../sls/scheduler/SLSCapacityScheduler.java | 9 +- .../yarn/api/records/ContainerUpdateType.java | 9 +- .../yarn/api/records/UpdateContainerError.java | 6 + .../api/records/UpdateContainerRequest.java | 10 + .../src/main/proto/yarn_service_protos.proto | 3 +- .../api/impl/TestAMRMClientOnRMRestart.java | 11 +- .../OpportunisticContainerAllocator.java | 16 +- .../OpportunisticContainerContext.java | 8 +- .../server/scheduler/SchedulerRequestKey.java | 51 ++- .../ApplicationMasterService.java | 53 ++- ...pportunisticContainerAllocatorAMService.java | 18 +- .../server/resourcemanager/RMServerUtils.java | 142 +++--- .../rmapp/attempt/RMAppAttemptImpl.java | 6 +- .../rmcontainer/RMContainerImpl.java | 32 +- .../scheduler/AbstractYarnScheduler.java | 104 ++++- .../resourcemanager/scheduler/Allocation.java | 26 +- .../scheduler/AppSchedulingInfo.java | 70 +-- .../scheduler/ContainerUpdateContext.java | 267 +++++++++++ .../scheduler/ContainerUpdates.java | 68 +++ .../scheduler/SchedulerApplicationAttempt.java | 170 ++++++- .../scheduler/SchedulerUtils.java | 22 + .../scheduler/YarnScheduler.java | 9 +- .../scheduler/capacity/CapacityScheduler.java | 16 +- .../allocator/RegularContainerAllocator.java | 2 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 13 +- .../scheduler/fair/FSAppAttempt.java | 4 +- .../scheduler/fair/FairScheduler.java | 41 +- .../scheduler/fifo/FifoAppAttempt.java | 4 +- .../scheduler/fifo/FifoScheduler.java | 5 +- .../LocalitySchedulingPlacementSet.java | 46 +- .../placement/SchedulingPlacementSet.java | 5 +- .../server/resourcemanager/Application.java | 4 +- .../yarn/server/resourcemanager/MockAM.java | 7 + .../yarn/server/resourcemanager/MockNM.java | 6 + .../resourcemanager/TestClientRMService.java | 5 +- ...pportunisticContainerAllocatorAMService.java | 456 ++++++++++++++++++- .../attempt/TestRMAppAttemptTransitions.java | 19 +- .../rmcontainer/TestRMContainerImpl.java | 7 +- .../capacity/TestCapacityScheduler.java | 37 +- .../scheduler/capacity/TestChildQueueOrder.java | 5 +- .../scheduler/capacity/TestReservations.java | 9 +- .../scheduler/fair/FairSchedulerTestBase.java | 14 +- .../fair/TestContinuousScheduling.java | 9 +- .../scheduler/fair/TestFairScheduler.java | 35 +- .../scheduler/fifo/TestFifoScheduler.java | 46 +- 47 files changed, 1612 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index f9ee9cc..e6aee6e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -110,7 +110,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -132,6 +131,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -1707,8 +1707,7 @@ public class TestRMContainerAllocator { ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<UpdateContainerRequest> increaseRequests, - List<UpdateContainerRequest> decreaseRequests) { + ContainerUpdates updateRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req @@ -1723,7 +1722,7 @@ public class TestRMContainerAllocator { lastBlacklistRemovals = blacklistRemovals; Allocation allocation = super.allocate( applicationAttemptId, askCopy, release, blacklistAdditions, - blacklistRemovals, increaseRequests, decreaseRequests); + blacklistRemovals, updateRequests); if (forceResourceLimit != null) { // Test wants to force the non-default resource limit allocation.setResourceLimit(forceResourceLimit); @@ -1754,8 +1753,7 @@ public class TestRMContainerAllocator { ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<UpdateContainerRequest> increaseRequest, - List<UpdateContainerRequest> decreaseRequests) { + ContainerUpdates updateRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req @@ -1766,7 +1764,7 @@ public class TestRMContainerAllocator { SecurityUtil.setTokenServiceUseIp(false); Allocation normalAlloc = super.allocate( applicationAttemptId, askCopy, release, - blacklistAdditions, blacklistRemovals, null, null); + blacklistAdditions, blacklistRemovals, updateRequests); List<Container> containers = normalAlloc.getContainers(); if(containers.size() > 0) { // allocate excess container http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 79f934c..e66de2f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -68,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -206,14 +206,13 @@ final public class ResourceSchedulerWrapper public Allocation allocate(ApplicationAttemptId attemptId, List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, List<String> strings, List<String> strings2, - List<UpdateContainerRequest> increaseRequests, - List<UpdateContainerRequest> decreaseRequests) { + ContainerUpdates updateRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; try { allocation = scheduler.allocate(attemptId, resourceRequests, - containerIds, strings, strings2, null, null); + containerIds, strings, strings2, updateRequests); return allocation; } finally { context.stop(); @@ -227,7 +226,7 @@ final public class ResourceSchedulerWrapper } } else { return scheduler.allocate(attemptId, - resourceRequests, containerIds, strings, strings2, null, null); + resourceRequests, containerIds, strings, strings2, updateRequests); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 89f9ad3..8388273 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -51,10 +51,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -182,15 +182,14 @@ public class SLSCapacityScheduler extends CapacityScheduler implements public Allocation allocate(ApplicationAttemptId attemptId, List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, List<String> strings, List<String> strings2, - List<UpdateContainerRequest> increaseRequests, - List<UpdateContainerRequest> decreaseRequests) { + ContainerUpdates updateRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; try { allocation = super .allocate(attemptId, resourceRequests, containerIds, strings, - strings2, increaseRequests, decreaseRequests); + strings2, updateRequests); return allocation; } finally { context.stop(); @@ -204,7 +203,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } } else { return super.allocate(attemptId, resourceRequests, containerIds, strings, - strings2, increaseRequests, decreaseRequests); + strings2, updateRequests); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java index 978ea09..6109cdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java @@ -39,7 +39,12 @@ public enum ContainerUpdateType { DECREASE_RESOURCE, /** - * Execution Type change. + * Execution Type promotion. */ - UPDATE_EXECUTION_TYPE + PROMOTE_EXECUTION_TYPE, + + /** + * Execution Type demotion. + */ + DEMOTE_EXECUTION_TYPE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java index 7102f7b..e7458cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java @@ -87,6 +87,12 @@ public abstract class UpdateContainerError { } @Override + public String toString() { + return "UpdateContainerError{reason=" + getReason() + ", " + + "req=" + getUpdateContainerRequest() + "}"; + } + + @Override public boolean equals(Object obj) { if (this == obj) { return true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java index e4f7a82..925a7979 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java @@ -150,11 +150,13 @@ public abstract class UpdateContainerRequest extends AbstractResourceRequest { ContainerId cId = getContainerId(); ExecutionType execType = getExecutionType(); Resource capability = getCapability(); + ContainerUpdateType updateType = getContainerUpdateType(); result = prime * result + ((capability == null) ? 0 : capability.hashCode()); result = prime * result + ((cId == null) ? 0 : cId.hashCode()); result = prime * result + getContainerVersion(); result = prime * result + ((execType == null) ? 0 : execType.hashCode()); + result = prime * result + ((updateType== null) ? 0 : updateType.hashCode()); return result; } @@ -208,6 +210,14 @@ public abstract class UpdateContainerRequest extends AbstractResourceRequest { } else if (!execType.equals(other.getExecutionType())) { return false; } + ContainerUpdateType updateType = getContainerUpdateType(); + if (updateType == null) { + if (other.getContainerUpdateType() != null) { + return false; + } + } else if (!updateType.equals(other.getContainerUpdateType())) { + return false; + } return true; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index d9230d4..aed1580 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -63,7 +63,8 @@ message FinishApplicationMasterResponseProto { enum ContainerUpdateTypeProto { INCREASE_RESOURCE = 0; DECREASE_RESOURCE = 1; - UPDATE_EXECUTION_TYPE = 2; + PROMOTE_EXECUTION_TYPE = 2; + DEMOTE_EXECUTION_TYPE = 3; } message UpdateContainerRequestProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index f1c49f2..ac77446 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; @@ -574,8 +575,7 @@ public class TestAMRMClientOnRMRestart { ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<UpdateContainerRequest> increaseRequests, - List<UpdateContainerRequest> decreaseRequests) { + ContainerUpdates updateRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = @@ -586,13 +586,12 @@ public class TestAMRMClientOnRMRestart { } lastAsk = ask; lastRelease = release; - lastIncrease = increaseRequests; - lastDecrease = decreaseRequests; + lastIncrease = updateRequests.getIncreaseRequests(); + lastDecrease = updateRequests.getDecreaseRequests(); lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; return super.allocate(applicationAttemptId, askCopy, release, - blacklistAdditions, blacklistRemovals, increaseRequests, - decreaseRequests); + blacklistAdditions, blacklistRemovals, updateRequests); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index c1300b2..6fd5228 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -321,13 +321,21 @@ public class OpportunisticContainerAllocator { // before accepting an ask) Resource capability = normalizeCapability(appParams, rr); + return createContainer( + rmIdentifier, appParams.getContainerTokenExpiryInterval(), + SchedulerRequestKey.create(rr), userName, node, cId, capability); + } + + private Container createContainer(long rmIdentifier, long tokenExpiry, + SchedulerRequestKey schedulerKey, String userName, RemoteNode node, + ContainerId cId, Resource capability) { long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier( cId, 0, node.getNodeId().toString(), userName, - capability, currTime + appParams.containerTokenExpiryInterval, + capability, currTime + tokenExpiry, tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, - rr.getPriority(), currTime, + schedulerKey.getPriority(), currTime, null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, ExecutionType.OPPORTUNISTIC); byte[] pwd = @@ -336,9 +344,9 @@ public class OpportunisticContainerAllocator { containerTokenIdentifier); Container container = BuilderUtils.newContainer( cId, node.getNodeId(), node.getHttpAddress(), - capability, rr.getPriority(), containerToken, + capability, schedulerKey.getPriority(), containerToken, containerTokenIdentifier.getExecutionType(), - rr.getAllocationRequestId()); + schedulerKey.getAllocationRequestId()); return container; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index a2f9f4d..1b1c5b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -150,8 +150,9 @@ public class OpportunisticContainerContext { resourceRequest.getNumContainers() + request.getNumContainers()); } if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at" + - "priority = "+ schedulerKey.getPriority() + LOG.info("# of outstandingOpReqs in ANY (at " + + "priority = " + schedulerKey.getPriority() + + ", allocationReqId = " + schedulerKey.getAllocationRequestId() + ", with capability = " + request.getCapability() + " ) : " + resourceRequest.getNumContainers()); } @@ -167,7 +168,8 @@ public class OpportunisticContainerContext { public void matchAllocationToOutstandingRequest(Resource capability, List<Container> allocatedContainers) { for (Container c : allocatedContainers) { - SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c); + SchedulerRequestKey schedulerKey = + SchedulerRequestKey.extractFrom(c); Map<Resource, ResourceRequest> asks = outstandingOpReqs.get(schedulerKey); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index 9b7edbe..36a9149 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -18,9 +18,12 @@ package org.apache.hadoop.yarn.server.scheduler; +import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; /** * Composite key for outstanding scheduler requests for any schedulable entity. @@ -31,6 +34,7 @@ public final class SchedulerRequestKey implements private final Priority priority; private final long allocationRequestId; + private final ContainerId containerToUpdate; /** * Factory method to generate a SchedulerRequestKey from a ResourceRequest. @@ -39,7 +43,13 @@ public final class SchedulerRequestKey implements */ public static SchedulerRequestKey create(ResourceRequest req) { return new SchedulerRequestKey(req.getPriority(), - req.getAllocationRequestId()); + req.getAllocationRequestId(), null); + } + + public static SchedulerRequestKey create(UpdateContainerRequest req, + SchedulerRequestKey schedulerRequestKey) { + return new SchedulerRequestKey(schedulerRequestKey.getPriority(), + schedulerRequestKey.getAllocationRequestId(), req.getContainerId()); } /** @@ -50,12 +60,16 @@ public final class SchedulerRequestKey implements */ public static SchedulerRequestKey extractFrom(Container container) { return new SchedulerRequestKey(container.getPriority(), - container.getAllocationRequestId()); + container.getAllocationRequestId(), null); } - SchedulerRequestKey(Priority priority, long allocationRequestId) { + + + public SchedulerRequestKey(Priority priority, long allocationRequestId, + ContainerId containerToUpdate) { this.priority = priority; this.allocationRequestId = allocationRequestId; + this.containerToUpdate = containerToUpdate; } /** @@ -76,6 +90,10 @@ public final class SchedulerRequestKey implements return allocationRequestId; } + public ContainerId getContainerToUpdate() { + return containerToUpdate; + } + @Override public int compareTo(SchedulerRequestKey o) { if (o == null) { @@ -85,6 +103,15 @@ public final class SchedulerRequestKey implements return 1; } } + + // Ensure updates are ranked higher + if (this.containerToUpdate == null && o.containerToUpdate != null) { + return -1; + } + if (this.containerToUpdate != null && o.containerToUpdate == null) { + return 1; + } + int priorityCompare = o.getPriority().compareTo(priority); // we first sort by priority and then by allocationRequestId if (priorityCompare != 0) { @@ -107,16 +134,21 @@ public final class SchedulerRequestKey implements if (getAllocationRequestId() != that.getAllocationRequestId()) { return false; } - return getPriority() != null ? - getPriority().equals(that.getPriority()) : - that.getPriority() == null; + if (!getPriority().equals(that.getPriority())) { + return false; + } + return containerToUpdate != null ? + containerToUpdate.equals(that.containerToUpdate) : + that.containerToUpdate == null; } @Override public int hashCode() { - int result = getPriority() != null ? getPriority().hashCode() : 0; - result = 31 * result + (int) (getAllocationRequestId() ^ ( - getAllocationRequestId() >>> 32)); + int result = priority != null ? priority.hashCode() : 0; + result = 31 * result + (int) (allocationRequestId ^ (allocationRequestId + >>> 32)); + result = 31 * result + (containerToUpdate != null ? containerToUpdate + .hashCode() : 0); return result; } @@ -125,6 +157,7 @@ public final class SchedulerRequestKey implements return "SchedulerRequestKey{" + "priority=" + priority + ", allocationRequestId=" + allocationRequestId + + ", containerToUpdate=" + containerToUpdate + '}'; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 9fd1845..70a46a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.UpdateContainerError; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; @@ -93,7 +92,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security @@ -559,12 +561,10 @@ public class ApplicationMasterService extends AbstractService implements // Split Update Resource Requests into increase and decrease. // No Exceptions are thrown here. All update errors are aggregated // and returned to the AM. - List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>(); - List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>(); - List<UpdateContainerError> updateContainerErrors = + List<UpdateContainerError> updateErrors = new ArrayList<>(); + ContainerUpdates containerUpdateRequests = RMServerUtils.validateAndSplitUpdateResourceRequests( - rmContext, request, maximumCapacity, - increaseResourceReqs, decreaseResourceReqs); + rmContext, request, maximumCapacity, updateErrors); // Send new requests to appAttempt. Allocation allocation; @@ -580,7 +580,7 @@ public class ApplicationMasterService extends AbstractService implements allocation = this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals, - increaseResourceReqs, decreaseResourceReqs); + containerUpdateRequests); } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { @@ -596,7 +596,7 @@ public class ApplicationMasterService extends AbstractService implements } // Notify the AM of container update errors - addToUpdateContainerErrors(allocateResponse, updateContainerErrors); + addToUpdateContainerErrors(allocateResponse, updateErrors); // update the response with the deltas of node status changes List<RMNode> updatedNodes = new ArrayList<RMNode>(); @@ -630,15 +630,7 @@ public class ApplicationMasterService extends AbstractService implements .pullJustFinishedContainers()); allocateResponse.setAvailableResources(allocation.getResourceLimit()); - // Handling increased containers - addToUpdatedContainers( - allocateResponse, ContainerUpdateType.INCREASE_RESOURCE, - allocation.getIncreasedContainers()); - - // Handling decreased containers - addToUpdatedContainers( - allocateResponse, ContainerUpdateType.DECREASE_RESOURCE, - allocation.getDecreasedContainers()); + addToContainerUpdates(appAttemptId, allocateResponse, allocation); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); @@ -658,6 +650,33 @@ public class ApplicationMasterService extends AbstractService implements .getApplicationPriority()); } + private void addToContainerUpdates(ApplicationAttemptId appAttemptId, + AllocateResponse allocateResponse, Allocation allocation) { + // Handling increased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.INCREASE_RESOURCE, + allocation.getIncreasedContainers()); + + // Handling decreased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.DECREASE_RESOURCE, + allocation.getDecreasedContainers()); + + // Handling promoted containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + allocation.getPromotedContainers()); + + // Handling demoted containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + allocation.getDemotedContainers()); + + addToUpdateContainerErrors(allocateResponse, + ((AbstractYarnScheduler)rScheduler) + .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors()); + } + protected void addToUpdateContainerErrors(AllocateResponse allocateResponse, List<UpdateContainerError> updateContainerErrors) { if (!updateContainerErrors.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 9d4c092..8f3a888 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; - import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; @@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; - import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; @@ -57,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; @@ -69,9 +68,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; - import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import java.io.IOException; @@ -251,6 +250,7 @@ public class OpportunisticContainerAllocatorAMService // Allocate GUARANTEED containers. request.setAskList(partitionedAsks.getGuaranteed()); + super.allocateInternal(appAttemptId, request, allocateResponse); } @@ -298,15 +298,9 @@ public class OpportunisticContainerAllocatorAMService boolean isRemotelyAllocated) { for (Container container : allocContainers) { // Create RMContainer - SchedulerApplicationAttempt appAttempt = - ((AbstractYarnScheduler) rmContext.getScheduler()) - .getCurrentAttemptForContainer(container.getId()); - RMContainer rmContainer = new RMContainerImpl(container, - appAttempt.getApplicationAttemptId(), container.getNodeId(), - appAttempt.getUser(), rmContext, isRemotelyAllocated); - appAttempt.addRMContainer(container.getId(), rmContainer); - ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( - container.getNodeId()).allocateContainer(rmContainer); + RMContainer rmContainer = + SchedulerUtils.createOpportunisticRmContainer( + rmContext, container, isRemotelyAllocated); rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.ACQUIRED)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 74898ca..ebbeb0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt .RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler @@ -80,7 +83,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; */ public class RMServerUtils { - private static final String UPDATE_OUTSTANDING_ERROR = + public static final String UPDATE_OUTSTANDING_ERROR = "UPDATE_OUTSTANDING_ERROR"; private static final String INCORRECT_CONTAINER_VERSION_ERROR = "INCORRECT_CONTAINER_VERSION_ERROR"; @@ -124,74 +127,105 @@ public class RMServerUtils { /** * Check if we have: - * - Request for same containerId and different target resource - * - If targetResources violates maximum/minimumAllocation - * @param rmContext RM context - * @param request Allocate Request - * @param maximumAllocation Maximum Allocation - * @param increaseResourceReqs Increase Resource Request - * @param decreaseResourceReqs Decrease Resource Request - * @return List of container Errors + * - Request for same containerId and different target resource. + * - If targetResources violates maximum/minimumAllocation. + * @param rmContext RM context. + * @param request Allocate Request. + * @param maximumAllocation Maximum Allocation. + * @param updateErrors Container update errors. + * @return ContainerUpdateRequests. */ - public static List<UpdateContainerError> + public static ContainerUpdates validateAndSplitUpdateResourceRequests(RMContext rmContext, AllocateRequest request, Resource maximumAllocation, - List<UpdateContainerRequest> increaseResourceReqs, - List<UpdateContainerRequest> decreaseResourceReqs) { - List<UpdateContainerError> errors = new ArrayList<>(); + List<UpdateContainerError> updateErrors) { + ContainerUpdates updateRequests = + new ContainerUpdates(); Set<ContainerId> outstandingUpdate = new HashSet<>(); for (UpdateContainerRequest updateReq : request.getUpdateRequests()) { RMContainer rmContainer = rmContext.getScheduler().getRMContainer( updateReq.getContainerId()); - String msg = null; - if (rmContainer == null) { - msg = INVALID_CONTAINER_ID; - } - // Only allow updates if the requested version matches the current - // version - if (msg == null && updateReq.getContainerVersion() != - rmContainer.getContainer().getVersion()) { - msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" - + updateReq.getContainerVersion() + "|" - + rmContainer.getContainer().getVersion(); - } - // No more than 1 container update per request. - if (msg == null && - outstandingUpdate.contains(updateReq.getContainerId())) { - msg = UPDATE_OUTSTANDING_ERROR; - } + String msg = validateContainerIdAndVersion(outstandingUpdate, + updateReq, rmContainer); + ContainerUpdateType updateType = updateReq.getContainerUpdateType(); if (msg == null) { - Resource original = rmContainer.getContainer().getResource(); - Resource target = updateReq.getCapability(); - if (Resources.fitsIn(target, original)) { - // This is a decrease request - if (validateIncreaseDecreaseRequest(rmContext, updateReq, - maximumAllocation, false)) { - decreaseResourceReqs.add(updateReq); - outstandingUpdate.add(updateReq.getContainerId()); + if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) && + (updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) { + Resource original = rmContainer.getContainer().getResource(); + Resource target = updateReq.getCapability(); + if (Resources.fitsIn(target, original)) { + // This is a decrease request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, false)) { + updateRequests.getDecreaseRequests().add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } } else { - msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + // This is an increase request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, true)) { + updateRequests.getIncreaseRequests().add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } } } else { - // This is an increase request - if (validateIncreaseDecreaseRequest(rmContext, updateReq, - maximumAllocation, true)) { - increaseResourceReqs.add(updateReq); - outstandingUpdate.add(updateReq.getContainerId()); - } else { - msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + ExecutionType original = rmContainer.getExecutionType(); + ExecutionType target = updateReq.getExecutionType(); + if (target != original) { + if (target == ExecutionType.GUARANTEED && + original == ExecutionType.OPPORTUNISTIC) { + updateRequests.getPromotionRequests().add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else if (target == ExecutionType.OPPORTUNISTIC && + original == ExecutionType.GUARANTEED) { + updateRequests.getDemotionRequests().add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } } } } - if (msg != null) { - UpdateContainerError updateError = RECORD_FACTORY - .newRecordInstance(UpdateContainerError.class); - updateError.setReason(msg); - updateError.setUpdateContainerRequest(updateReq); - errors.add(updateError); - } + checkAndcreateUpdateError(updateErrors, updateReq, msg); + } + return updateRequests; + } + + private static void checkAndcreateUpdateError( + List<UpdateContainerError> errors, UpdateContainerRequest updateReq, + String msg) { + if (msg != null) { + UpdateContainerError updateError = RECORD_FACTORY + .newRecordInstance(UpdateContainerError.class); + updateError.setReason(msg); + updateError.setUpdateContainerRequest(updateReq); + errors.add(updateError); + } + } + + private static String validateContainerIdAndVersion( + Set<ContainerId> outstandingUpdate, UpdateContainerRequest updateReq, + RMContainer rmContainer) { + String msg = null; + if (rmContainer == null) { + msg = INVALID_CONTAINER_ID; + } + // Only allow updates if the requested version matches the current + // version + if (msg == null && updateReq.getContainerVersion() != + rmContainer.getContainer().getVersion()) { + msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" + + updateReq.getContainerVersion() + "|" + + rmContainer.getContainer().getVersion(); + } + // No more than 1 container update per request. + if (msg == null && + outstandingUpdate.contains(updateReq.getContainerId())) { + msg = UPDATE_OUTSTANDING_ERROR; } - return errors; + return msg; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index f7ae488..ab84985 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -1072,7 +1073,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, amBlacklist.getBlacklistAdditions(), - amBlacklist.getBlacklistRemovals(), null, null); + amBlacklist.getBlacklistRemovals(), + new ContainerUpdates()); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); @@ -1096,7 +1098,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, - null, null, null); + null, new ContainerUpdates()); // There must be at least one container allocated, because a // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, // and is put in SchedulerApplication#newlyAllocatedContainers. http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index dbc6169..79709a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -108,6 +108,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, RMContainerEventType.LAUNCHED) + .addTransition(RMContainerState.ACQUIRED, RMContainerState.ACQUIRED, + RMContainerEventType.ACQUIRED) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, RMContainerEventType.FINISHED, new FinishedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, @@ -125,6 +127,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, + RMContainerEventType.ACQUIRED) + .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.RESERVED, new ContainerReservedTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) @@ -163,13 +167,13 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { private final WriteLock writeLock; private final ApplicationAttemptId appAttemptId; private final NodeId nodeId; - private final Container container; private final RMContext rmContext; private final EventHandler eventHandler; private final ContainerAllocationExpirer containerAllocationExpirer; private final String user; private final String nodeLabelExpression; + private volatile Container container; private Resource reservedResource; private NodeId reservedNode; private SchedulerRequestKey reservedSchedulerKey; @@ -188,44 +192,44 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { private boolean isExternallyAllocated; private SchedulerRequestKey allocatedSchedulerKey; - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { - this(container, appAttemptId, nodeId, user, rmContext, System + this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System .currentTimeMillis(), ""); } - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, boolean isExternallyAllocated) { - this(container, appAttemptId, nodeId, user, rmContext, System + this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System .currentTimeMillis(), "", isExternallyAllocated); } private boolean saveNonAMContainerMetaInfo; - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, String nodeLabelExpression) { - this(container, appAttemptId, nodeId, user, rmContext, System + this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System .currentTimeMillis(), nodeLabelExpression); } - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, long creationTime, String nodeLabelExpression) { - this(container, appAttemptId, nodeId, user, rmContext, creationTime, - nodeLabelExpression, false); + this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, + creationTime, nodeLabelExpression, false); } - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, long creationTime, String nodeLabelExpression, boolean isExternallyAllocated) { this.stateMachine = stateMachineFactory.make(this); this.nodeId = nodeId; this.container = container; - this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container); + this.allocatedSchedulerKey = schedulerKey; this.appAttemptId = appAttemptId; this.user = user; this.creationTime = creationTime; @@ -276,6 +280,10 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { return this.container; } + public void setContainer(Container container) { + this.container = container; + } + @Override public RMContainerState getState() { this.readLock.lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c1a985d..acfcde8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -81,6 +83,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; + + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -504,9 +511,11 @@ public abstract class AbstractYarnScheduler ApplicationAttemptId attemptId = container.getId().getApplicationAttemptId(); RMContainer rmContainer = - new RMContainerImpl(container, attemptId, node.getNodeID(), - applications.get(attemptId.getApplicationId()).getUser(), rmContext, - status.getCreationTime(), status.getNodeLabelExpression()); + new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), attemptId, + node.getNodeID(), applications.get( + attemptId.getApplicationId()).getUser(), rmContext, + status.getCreationTime(), status.getNodeLabelExpression()); return rmContainer; } @@ -1053,4 +1062,93 @@ public abstract class AbstractYarnScheduler normalizeRequest(ask); } } + + protected void handleExecutionTypeUpdates( + SchedulerApplicationAttempt appAttempt, + List<UpdateContainerRequest> promotionRequests, + List<UpdateContainerRequest> demotionRequests) { + if (promotionRequests != null && !promotionRequests.isEmpty()) { + LOG.info("Promotion Update requests : " + promotionRequests); + handlePromotionRequests(appAttempt, promotionRequests); + } + if (demotionRequests != null && !demotionRequests.isEmpty()) { + LOG.info("Demotion Update requests : " + demotionRequests); + handleDemotionRequests(appAttempt, demotionRequests); + } + } + + private void handlePromotionRequests( + SchedulerApplicationAttempt applicationAttempt, + List<UpdateContainerRequest> updateContainerRequests) { + for (UpdateContainerRequest uReq : updateContainerRequests) { + RMContainer rmContainer = + rmContext.getScheduler().getRMContainer(uReq.getContainerId()); + // Check if this is a container update + // And not in the middle of a Demotion + if (rmContainer != null) { + // Check if this is an executionType change request + // If so, fix the rr to make it look like a normal rr + // with relaxLocality=false and numContainers=1 + SchedulerNode schedulerNode = rmContext.getScheduler() + .getSchedulerNode(rmContainer.getContainer().getNodeId()); + + // Add only if no outstanding promote requests exist. + if (!applicationAttempt.getUpdateContext() + .checkAndAddToOutstandingIncreases( + rmContainer, schedulerNode, uReq)) { + applicationAttempt.addToUpdateContainerErrors( + UpdateContainerError.newInstance( + RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); + } + } else { + LOG.warn("Cannot promote non-existent (or completed) Container [" + + uReq.getContainerId() + "]"); + } + } + } + + private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt, + List<UpdateContainerRequest> demotionRequests) { + OpportunisticContainerContext oppCntxt = + appAttempt.getOpportunisticContainerContext(); + for (UpdateContainerRequest uReq : demotionRequests) { + RMContainer rmContainer = + rmContext.getScheduler().getRMContainer(uReq.getContainerId()); + if (rmContainer != null) { + if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases( + rmContainer.getContainer())) { + RMContainer demotedRMContainer = + createDemotedRMContainer(appAttempt, oppCntxt, rmContainer); + appAttempt.addToNewlyDemotedContainers( + uReq.getContainerId(), demotedRMContainer); + } else { + appAttempt.addToUpdateContainerErrors( + UpdateContainerError.newInstance( + RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); + } + } else { + LOG.warn("Cannot demote non-existent (or completed) Container [" + + uReq.getContainerId() + "]"); + } + } + } + + private RMContainer createDemotedRMContainer( + SchedulerApplicationAttempt appAttempt, + OpportunisticContainerContext oppCntxt, + RMContainer rmContainer) { + SchedulerRequestKey sk = + SchedulerRequestKey.extractFrom(rmContainer.getContainer()); + Container demotedContainer = BuilderUtils.newContainer( + ContainerId.newContainerId(appAttempt.getApplicationAttemptId(), + oppCntxt.getContainerIdGenerator().generateContainerId()), + rmContainer.getContainer().getNodeId(), + rmContainer.getContainer().getNodeHttpAddress(), + rmContainer.getContainer().getResource(), + sk.getPriority(), null, ExecutionType.OPPORTUNISTIC, + sk.getAllocationRequestId()); + demotedContainer.setVersion(rmContainer.getContainer().getVersion()); + return SchedulerUtils.createOpportunisticRmContainer( + rmContext, demotedContainer, false); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index b81da2b..43eadab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -36,6 +36,8 @@ public class Allocation { final List<NMToken> nmTokens; final List<Container> increasedContainers; final List<Container> decreasedContainers; + final List<Container> promotedContainers; + final List<Container> demotedContainers; private Resource resourceLimit; @@ -50,13 +52,23 @@ public class Allocation { Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) { this(containers, resourceLimit,strictContainers, fungibleContainers, - fungibleResources, nmTokens, null, null); + fungibleResources, nmTokens, null, null, null, null); } - + public Allocation(List<Container> containers, Resource resourceLimit, Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, List<ResourceRequest> fungibleResources, List<NMToken> nmTokens, List<Container> increasedContainers, List<Container> decreasedContainer) { + this(containers, resourceLimit,strictContainers, fungibleContainers, + fungibleResources, nmTokens, increasedContainers, decreasedContainer, + null, null); + } + + public Allocation(List<Container> containers, Resource resourceLimit, + Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, + List<ResourceRequest> fungibleResources, List<NMToken> nmTokens, + List<Container> increasedContainers, List<Container> decreasedContainer, + List<Container> promotedContainers, List<Container> demotedContainer) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; @@ -65,6 +77,8 @@ public class Allocation { this.nmTokens = nmTokens; this.increasedContainers = increasedContainers; this.decreasedContainers = decreasedContainer; + this.promotedContainers = promotedContainers; + this.demotedContainers = demotedContainer; } public List<Container> getContainers() { @@ -99,6 +113,14 @@ public class Allocation { return decreasedContainers; } + public List<Container> getPromotedContainers() { + return promotedContainers; + } + + public List<Container> getDemotedContainers() { + return demotedContainers; + } + @VisibleForTesting public void setResourceLimit(Resource resource) { this.resourceLimit = resource; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 0551df1..d901d90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -55,7 +55,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; - /** * This class keeps track of all the consumption of an application. This also * keeps track of current running/completed containers for the application. @@ -92,10 +91,11 @@ public class AppSchedulingInfo { final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>> containerIncreaseRequestMap = new ConcurrentHashMap<>(); - private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; + public final ContainerUpdateContext updateContext; + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, long epoch, ResourceUsage appResourceUsage) { @@ -109,6 +109,7 @@ public class AppSchedulingInfo { this.appResourceUsage = appResourceUsage; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + updateContext = new ContainerUpdateContext(this); readLock = lock.readLock(); writeLock = lock.writeLock(); } @@ -376,6 +377,10 @@ public class AppSchedulingInfo { } } + public ContainerUpdateContext getUpdateContext() { + return updateContext; + } + /** * The ApplicationMaster is updating resource requirements for the * application, by asking for more resources and releasing resources acquired @@ -413,29 +418,9 @@ public class AppSchedulingInfo { } // Update scheduling placement set - for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry : dedupRequests.entrySet()) { - SchedulerRequestKey schedulerRequestKey = entry.getKey(); - - if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) { - schedulerKeyToPlacementSets.put(schedulerRequestKey, - new LocalitySchedulingPlacementSet<>(this)); - } - - // Update placement set - ResourceRequestUpdateResult pendingAmountChanges = - schedulerKeyToPlacementSets.get(schedulerRequestKey) - .updateResourceRequests( - entry.getValue().values(), - recoverPreemptedRequestForAContainer); - - if (null != pendingAmountChanges) { - updatePendingResources( - pendingAmountChanges.getLastAnyResourceRequest(), - pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey, - queue.getMetrics()); - offswitchResourcesUpdated = true; - } - } + offswitchResourcesUpdated = + addToPlacementSets( + recoverPreemptedRequestForAContainer, dedupRequests); return offswitchResourcesUpdated; } finally { @@ -443,6 +428,37 @@ public class AppSchedulingInfo { } } + boolean addToPlacementSets( + boolean recoverPreemptedRequestForAContainer, + Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) { + boolean offswitchResourcesUpdated = false; + for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry : + dedupRequests.entrySet()) { + SchedulerRequestKey schedulerRequestKey = entry.getKey(); + + if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) { + schedulerKeyToPlacementSets.put(schedulerRequestKey, + new LocalitySchedulingPlacementSet<>(this)); + } + + // Update placement set + ResourceRequestUpdateResult pendingAmountChanges = + schedulerKeyToPlacementSets.get(schedulerRequestKey) + .updateResourceRequests( + entry.getValue().values(), + recoverPreemptedRequestForAContainer); + + if (null != pendingAmountChanges) { + updatePendingResources( + pendingAmountChanges.getLastAnyResourceRequest(), + pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey, + queue.getMetrics()); + offswitchResourcesUpdated = true; + } + } + return offswitchResourcesUpdated; + } + private void updatePendingResources(ResourceRequest lastRequest, ResourceRequest request, SchedulerRequestKey schedulerKey, QueueMetrics metrics) { @@ -717,8 +733,8 @@ public class AppSchedulingInfo { updateMetricsForAllocatedContainer(type, containerAllocated); } - return schedulerKeyToPlacementSets.get(schedulerKey).allocate(type, node, - request); + return schedulerKeyToPlacementSets.get(schedulerKey) + .allocate(schedulerKey, type, node, request); } finally { writeLock.unlock(); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org