Repository: hadoop Updated Branches: refs/heads/YARN-5734 b59c20d17 -> 6e1a54403 (forced update)
YARN-5966. AMRMClient changes to support ExecutionType update. (asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aaf106fd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aaf106fd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aaf106fd Branch: refs/heads/YARN-5734 Commit: aaf106fde35ec97e2e2ea4d7a67434038c4273ac Parents: 4164a20 Author: Arun Suresh <asur...@apache.org> Authored: Tue Feb 14 06:08:27 2017 -0800 Committer: Arun Suresh <asur...@apache.org> Committed: Tue Feb 14 06:09:10 2017 -0800 ---------------------------------------------------------------------- .../yarn/api/records/UpdateContainerError.java | 19 +- .../src/main/proto/yarn_service_protos.proto | 1 + .../hadoop/yarn/client/api/AMRMClient.java | 33 +- .../yarn/client/api/async/AMRMClientAsync.java | 33 +- .../api/async/impl/AMRMClientAsyncImpl.java | 7 +- .../yarn/client/api/impl/AMRMClientImpl.java | 111 +++-- .../yarn/client/api/impl/TestAMRMClient.java | 60 ++- .../api/impl/TestAMRMClientOnRMRestart.java | 8 +- .../TestOpportunisticContainerAllocation.java | 400 +++++++++++++++++-- .../impl/pb/UpdateContainerErrorPBImpl.java | 16 + .../server/resourcemanager/RMServerUtils.java | 14 +- ...pportunisticContainerAllocatorAMService.java | 5 +- .../capacity/TestIncreaseAllocationExpirer.java | 4 +- 13 files changed, 587 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java index e7458cf..4d184cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java @@ -59,6 +59,22 @@ public abstract class UpdateContainerError { public abstract void setReason(String reason); /** + * Get current container version. + * @return Current container Version. + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract int getCurrentContainerVersion(); + + /** + * Set current container version. + * @param currentVersion Current container version. + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setCurrentContainerVersion(int currentVersion); + + /** * Get the {@code UpdateContainerRequest} that was not satisfiable. * @return UpdateContainerRequest */ @@ -89,6 +105,7 @@ public abstract class UpdateContainerError { @Override public String toString() { return "UpdateContainerError{reason=" + getReason() + ", " + + "currentVersion=" + getCurrentContainerVersion() + ", " + "req=" + getUpdateContainerRequest() + "}"; } @@ -120,6 +137,6 @@ public abstract class UpdateContainerError { } else if (!req.equals(other.getUpdateContainerRequest())) { return false; } - return true; + return getCurrentContainerVersion() == other.getCurrentContainerVersion(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index df3c852..c6647c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -78,6 +78,7 @@ message UpdateContainerRequestProto { message UpdateContainerErrorProto { optional string reason = 1; optional UpdateContainerRequestProto update_request = 2; + optional int32 current_container_version = 3; } message AllocateRequestProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 52155f5..15d0065 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -33,17 +33,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.yarn.util.resource.Resources; @InterfaceAudience.Public @InterfaceStability.Stable @@ -518,12 +521,38 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends * ResourceManager to change the existing resource allocation to the target * resource allocation. * + * @deprecated use + * {@link #requestContainerUpdate(Container, UpdateContainerRequest)} + * * @param container The container returned from the last successful resource * allocation or resource change * @param capability The target resource capability of the container */ - public abstract void requestContainerResourceChange( - Container container, Resource capability); + @Deprecated + public void requestContainerResourceChange( + Container container, Resource capability) { + Preconditions.checkNotNull(container, "Container cannot be null!!"); + Preconditions.checkNotNull(capability, + "UpdateContainerRequest cannot be null!!"); + requestContainerUpdate(container, UpdateContainerRequest.newInstance( + container.getVersion(), container.getId(), + Resources.fitsIn(capability, container.getResource()) ? + ContainerUpdateType.DECREASE_RESOURCE : + ContainerUpdateType.INCREASE_RESOURCE, + capability, null)); + } + + /** + * Request a container update before calling <code>allocate</code>. + * Any previous pending update request of the same container will be + * removed. + * + * @param container The container returned from the last successful resource + * allocation or update + * @param updateContainerRequest The <code>UpdateContainerRequest</code>. + */ + public abstract void requestContainerUpdate( + Container container, UpdateContainerRequest updateContainerRequest); /** * Release containers assigned by the Resource Manager. If the app cannot use http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index d2195a6..4cb27cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -36,11 +36,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.resource.Resources; /** * <code>AMRMClientAsync</code> handles communication with the ResourceManager @@ -284,12 +287,38 @@ extends AbstractService { * ResourceManager to change the existing resource allocation to the target * resource allocation. * + * @deprecated use + * {@link #requestContainerUpdate(Container, UpdateContainerRequest)} + * * @param container The container returned from the last successful resource * allocation or resource change * @param capability The target resource capability of the container */ - public abstract void requestContainerResourceChange( - Container container, Resource capability); + @Deprecated + public void requestContainerResourceChange( + Container container, Resource capability) { + Preconditions.checkNotNull(container, "Container cannot be null!!"); + Preconditions.checkNotNull(capability, + "UpdateContainerRequest cannot be null!!"); + requestContainerUpdate(container, UpdateContainerRequest.newInstance( + container.getVersion(), container.getId(), + Resources.fitsIn(capability, container.getResource()) ? + ContainerUpdateType.DECREASE_RESOURCE : + ContainerUpdateType.INCREASE_RESOURCE, + capability, null)); + } + + /** + * Request a container update before calling <code>allocate</code>. + * Any previous pending update request of the same container will be + * removed. + * + * @param container The container returned from the last successful resource + * allocation or update + * @param updateContainerRequest The <code>UpdateContainerRequest</code>. + */ + public abstract void requestContainerUpdate( + Container container, UpdateContainerRequest updateContainerRequest); /** * Release containers assigned by the Resource Manager. If the app cannot use http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 3e72d3f..9e2c0e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -207,9 +208,9 @@ extends AMRMClientAsync<T> { } @Override - public void requestContainerResourceChange( - Container container, Resource capability) { - client.requestContainerResourceChange(container, capability); + public void requestContainerUpdate(Container container, + UpdateContainerRequest updateContainerRequest) { + client.requestContainerUpdate(container, updateContainerRequest); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 44fc1e0..7da91de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -169,15 +169,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>(); // change map holds container resource change requests between two allocate() // calls, and are cleared after each successful allocate() call. - protected final Map<ContainerId, SimpleEntry<Container, Resource>> change = - new HashMap<>(); + protected final Map<ContainerId, + SimpleEntry<Container, UpdateContainerRequest>> change = new HashMap<>(); // pendingChange map holds history of container resource change requests in // case AM needs to reregister with the ResourceManager. // Change requests are removed from this map if RM confirms the change // through allocate response, or if RM confirms that the container has been // completed. - protected final Map<ContainerId, SimpleEntry<Container, Resource>> - pendingChange = new HashMap<>(); + protected final Map<ContainerId, + SimpleEntry<Container, UpdateContainerRequest>> pendingChange = + new HashMap<>(); public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); @@ -259,7 +260,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { AllocateRequest allocateRequest = null; List<String> blacklistToAdd = new ArrayList<String>(); List<String> blacklistToRemove = new ArrayList<String>(); - Map<ContainerId, SimpleEntry<Container, Resource>> oldChange = + Map<ContainerId, SimpleEntry<Container, UpdateContainerRequest>> oldChange = new HashMap<>(); try { synchronized (this) { @@ -374,14 +375,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { // // Only insert entries from the cached oldChange map // that do not exist in the current change map: - for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry : + for (Map.Entry<ContainerId, + SimpleEntry<Container, UpdateContainerRequest>> entry : oldChange.entrySet()) { ContainerId oldContainerId = entry.getKey(); Container oldContainer = entry.getValue().getKey(); - Resource oldResource = entry.getValue().getValue(); + UpdateContainerRequest oldupdate = entry.getValue().getValue(); if (change.get(oldContainerId) == null) { change.put( - oldContainerId, new SimpleEntry<>(oldContainer, oldResource)); + oldContainerId, new SimpleEntry<>(oldContainer, oldupdate)); } } blacklistAdditions.addAll(blacklistToAdd); @@ -394,19 +396,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { private List<UpdateContainerRequest> createUpdateList() { List<UpdateContainerRequest> updateList = new ArrayList<>(); - for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry : - change.entrySet()) { - Resource targetCapability = entry.getValue().getValue(); - Resource currCapability = entry.getValue().getKey().getResource(); - int version = entry.getValue().getKey().getVersion(); + for (Map.Entry<ContainerId, SimpleEntry<Container, + UpdateContainerRequest>> entry : change.entrySet()) { + Resource targetCapability = entry.getValue().getValue().getCapability(); + ExecutionType targetExecType = + entry.getValue().getValue().getExecutionType(); ContainerUpdateType updateType = - ContainerUpdateType.INCREASE_RESOURCE; - if (Resources.fitsIn(targetCapability, currCapability)) { - updateType = ContainerUpdateType.DECREASE_RESOURCE; - } + entry.getValue().getValue().getContainerUpdateType(); + int version = entry.getValue().getKey().getVersion(); updateList.add( UpdateContainerRequest.newInstance(version, entry.getKey(), - updateType, targetCapability, null)); + updateType, targetCapability, targetExecType)); } return updateList; } @@ -591,21 +591,47 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } @Override - public synchronized void requestContainerResourceChange( - Container container, Resource capability) { - validateContainerResourceChangeRequest( - container.getId(), container.getResource(), capability); + public synchronized void requestContainerUpdate( + Container container, UpdateContainerRequest updateContainerRequest) { + Preconditions.checkNotNull(container, "Container cannot be null!!"); + Preconditions.checkNotNull(updateContainerRequest, + "UpdateContainerRequest cannot be null!!"); + LOG.info("Requesting Container update : " + + "container=" + container + ", " + + "updateType=" + updateContainerRequest.getContainerUpdateType() + ", " + + "targetCapability=" + updateContainerRequest.getCapability() + ", " + + "targetExecType=" + updateContainerRequest.getExecutionType()); + if (updateContainerRequest.getCapability() != null && + updateContainerRequest.getExecutionType() == null) { + validateContainerResourceChangeRequest( + updateContainerRequest.getContainerUpdateType(), + container.getId(), container.getResource(), + updateContainerRequest.getCapability()); + } else if (updateContainerRequest.getExecutionType() != null && + updateContainerRequest.getCapability() == null) { + validateContainerExecTypeChangeRequest( + updateContainerRequest.getContainerUpdateType(), + container.getId(), container.getExecutionType(), + updateContainerRequest.getExecutionType()); + } else if (updateContainerRequest.getExecutionType() == null && + updateContainerRequest.getCapability() == null) { + throw new IllegalArgumentException("Both target Capability and" + + "target Execution Type are null"); + } else { + throw new IllegalArgumentException("Support currently exists only for" + + " EITHER update of Capability OR update of Execution Type NOT both"); + } if (change.get(container.getId()) == null) { change.put(container.getId(), - new SimpleEntry<>(container, capability)); + new SimpleEntry<>(container, updateContainerRequest)); } else { - change.get(container.getId()).setValue(capability); + change.get(container.getId()).setValue(updateContainerRequest); } if (pendingChange.get(container.getId()) == null) { pendingChange.put(container.getId(), - new SimpleEntry<>(container, capability)); + new SimpleEntry<>(container, updateContainerRequest)); } else { - pendingChange.get(container.getId()).setValue(capability); + pendingChange.get(container.getId()).setValue(updateContainerRequest); } } @@ -755,7 +781,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } private void validateContainerResourceChangeRequest( - ContainerId containerId, Resource original, Resource target) { + ContainerUpdateType updateType, ContainerId containerId, + Resource original, Resource target) { Preconditions.checkArgument(containerId != null, "ContainerId cannot be null"); Preconditions.checkArgument(original != null, @@ -768,6 +795,36 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { Preconditions.checkArgument(!Resources.equals(Resources.none(), target) && Resources.fitsIn(Resources.none(), target), "Target resource capability must be greater than 0"); + if (ContainerUpdateType.DECREASE_RESOURCE == updateType) { + Preconditions.checkArgument(Resources.fitsIn(target, original), + "Target resource capability must fit in Original capability"); + } else { + Preconditions.checkArgument(Resources.fitsIn(original, target), + "Target resource capability must be more than Original capability"); + + } + } + + private void validateContainerExecTypeChangeRequest( + ContainerUpdateType updateType, ContainerId containerId, + ExecutionType original, ExecutionType target) { + Preconditions.checkArgument(containerId != null, + "ContainerId cannot be null"); + Preconditions.checkArgument(original != null, + "Original Execution Type cannot be null"); + Preconditions.checkArgument(target != null, + "Target Execution Type cannot be null"); + if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) { + Preconditions.checkArgument(target == ExecutionType.OPPORTUNISTIC + && original == ExecutionType.GUARANTEED, + "Incorrect Container update request, target should be" + + " OPPORTUNISTIC and original should be GUARANTEED"); + } else { + Preconditions.checkArgument(target == ExecutionType.GUARANTEED + && original == ExecutionType.OPPORTUNISTIC, + "Incorrect Container update request, target should be" + + " GUARANTEED and original should be OPPORTUNISTIC"); + } } private void addResourceRequestToAsk(ResourceRequest remoteRequest) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 8b1bbc7..4f73bac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -51,29 +51,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.UpdatedContainer; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -1058,26 +1036,36 @@ public class TestAMRMClient { (AMRMClientImpl<ContainerRequest>) amClient; Assert.assertEquals(0, amClientImpl.change.size()); // verify newer request overwrites older request for the container1 - amClientImpl.requestContainerResourceChange( - container1, Resource.newInstance(2048, 1)); - amClientImpl.requestContainerResourceChange( - container1, Resource.newInstance(4096, 1)); + amClientImpl.requestContainerUpdate(container1, + UpdateContainerRequest.newInstance(container1.getVersion(), + container1.getId(), ContainerUpdateType.INCREASE_RESOURCE, + Resource.newInstance(2048, 1), null)); + amClientImpl.requestContainerUpdate(container1, + UpdateContainerRequest.newInstance(container1.getVersion(), + container1.getId(), ContainerUpdateType.INCREASE_RESOURCE, + Resource.newInstance(4096, 1), null)); Assert.assertEquals(Resource.newInstance(4096, 1), - amClientImpl.change.get(container1.getId()).getValue()); + amClientImpl.change.get(container1.getId()).getValue().getCapability()); // verify new decrease request cancels old increase request for container1 - amClientImpl.requestContainerResourceChange( - container1, Resource.newInstance(512, 1)); + amClientImpl.requestContainerUpdate(container1, + UpdateContainerRequest.newInstance(container1.getVersion(), + container1.getId(), ContainerUpdateType.DECREASE_RESOURCE, + Resource.newInstance(512, 1), null)); Assert.assertEquals(Resource.newInstance(512, 1), - amClientImpl.change.get(container1.getId()).getValue()); + amClientImpl.change.get(container1.getId()).getValue().getCapability()); // request resource increase for container2 - amClientImpl.requestContainerResourceChange( - container2, Resource.newInstance(2048, 1)); + amClientImpl.requestContainerUpdate(container2, + UpdateContainerRequest.newInstance(container2.getVersion(), + container2.getId(), ContainerUpdateType.INCREASE_RESOURCE, + Resource.newInstance(2048, 1), null)); Assert.assertEquals(Resource.newInstance(2048, 1), - amClientImpl.change.get(container2.getId()).getValue()); + amClientImpl.change.get(container2.getId()).getValue().getCapability()); // verify release request will cancel pending change requests for the same // container - amClientImpl.requestContainerResourceChange( - container3, Resource.newInstance(2048, 1)); + amClientImpl.requestContainerUpdate(container3, + UpdateContainerRequest.newInstance(container3.getVersion(), + container3.getId(), ContainerUpdateType.INCREASE_RESOURCE, + Resource.newInstance(2048, 1), null)); Assert.assertEquals(3, amClientImpl.pendingChange.size()); amClientImpl.releaseAssignedContainer(container3.getId()); Assert.assertEquals(2, amClientImpl.pendingChange.size()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index ac77446..39a7633 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -233,8 +234,11 @@ public class TestAMRMClientOnRMRestart { nm1.nodeHeartbeat(containerId.getApplicationAttemptId(), containerId.getContainerId(), ContainerState.RUNNING); dispatcher.await(); - amClient.requestContainerResourceChange( - container, Resource.newInstance(2048, 1)); + amClient.requestContainerUpdate( + container, UpdateContainerRequest.newInstance( + container.getVersion(), container.getId(), + ContainerUpdateType.INCREASE_RESOURCE, + Resource.newInstance(2048, 1), null)); it.remove(); allocateResponse = amClient.allocate(0.3f); http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java index 802c207..305d18b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -44,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -54,6 +57,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -66,13 +72,17 @@ import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; /** * Class that tests the allocation of OPPORTUNISTIC containers through the @@ -83,7 +93,6 @@ public class TestOpportunisticContainerAllocation { private static MiniYARNCluster yarnCluster = null; private static YarnClient yarnClient = null; private static List<NodeReport> nodeReports = null; - private static ApplicationAttemptId attemptId = null; private static int nodeCount = 3; private static final int ROLLING_INTERVAL_SEC = 13; @@ -92,12 +101,22 @@ public class TestOpportunisticContainerAllocation { private static Resource capability; private static Priority priority; private static Priority priority2; + private static Priority priority3; + private static Priority priority4; private static String node; private static String rack; private static String[] nodes; private static String[] racks; private final static int DEFAULT_ITERATION = 3; + // Per test.. + private ApplicationAttemptId attemptId = null; + private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null; + private long availMB; + private int availVCores; + private long allocMB; + private int allocVCores; + @BeforeClass public static void setup() throws Exception { // start minicluster @@ -106,7 +125,7 @@ public class TestOpportunisticContainerAllocation { YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, ROLLING_INTERVAL_SEC); conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS); - conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000); // set the minimum allocation so that resource decrease can go under 1024 conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setBoolean( @@ -129,7 +148,9 @@ public class TestOpportunisticContainerAllocation { priority = Priority.newInstance(1); priority2 = Priority.newInstance(2); - capability = Resource.newInstance(1024, 1); + priority3 = Priority.newInstance(3); + priority4 = Priority.newInstance(4); + capability = Resource.newInstance(512, 1); node = nodeReports.get(0).getNodeId().getHost(); rack = nodeReports.get(0).getRackName(); @@ -193,10 +214,35 @@ public class TestOpportunisticContainerAllocation { UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); appAttempt.getAMRMToken() .setService(ClientRMProxy.getAMRMTokenService(conf)); + + // start am rm client + amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient + .createAMRMClient(); + + //setting an instance NMTokenCache + amClient.setNMTokenCache(new NMTokenCache()); + //asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + + amClient.init(conf); + amClient.start(); + + amClient.registerApplicationMaster("Host", 10000, ""); } @After public void cancelApp() throws YarnException, IOException { + try { + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); + } finally { + if (amClient != null && + amClient.getServiceState() == Service.STATE.STARTED) { + amClient.stop(); + } + } yarnClient.killApplication(attemptId.getApplicationId()); attemptId = null; } @@ -214,43 +260,254 @@ public class TestOpportunisticContainerAllocation { } @Test(timeout = 60000) - public void testAMRMClient() throws YarnException, IOException { - AMRMClient<AMRMClient.ContainerRequest> amClient = null; + public void testPromotionFromAcquired() throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + + int oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, capability).remoteRequest + .getNumContainers(); + + assertEquals(1, oppContainersRequestedAny); + + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>(); + int iterationsLeft = 50; + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap<String, Token> receivedNMTokens = new HashMap<>(); + + updateMetrics("Before Opp Allocation"); + + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + allocatedOpportContainers.put(container.getId(), container); + removeCR(container); + } + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(oppContainersRequestedAny, allocatedContainerCount); + assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size()); + + updateMetrics("After Opp Allocation / Before Promotion"); + try { - // start am rm client - amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient(); + Container c = allocatedOpportContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC)); + Assert.fail("Should throw Exception.."); + } catch (IllegalArgumentException e) { + System.out.println("## " + e.getMessage()); + Assert.assertTrue(e.getMessage().contains( + "target should be GUARANTEED and original should be OPPORTUNISTIC")); + } - //setting an instance NMTokenCache - amClient.setNMTokenCache(new NMTokenCache()); - //asserting we are not using the singleton instance cache - Assert.assertNotSame(NMTokenCache.getSingleton(), - amClient.getNMTokenCache()); + Container c = allocatedOpportContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED)); + iterationsLeft = 120; + Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>(); + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + if (allocResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + System.out.println("Got update.."); + updatedContainers.put(updatedContainer.getContainer().getId(), + updatedContainer); + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } - amClient.init(conf); - amClient.start(); + updateMetrics("After Promotion"); + + assertEquals(1, updatedContainers.size()); + for (ContainerId cId : allocatedOpportContainers.keySet()) { + Container orig = allocatedOpportContainers.get(cId); + UpdatedContainer updatedContainer = updatedContainers.get(cId); + assertNotNull(updatedContainer); + assertEquals(ExecutionType.GUARANTEED, + updatedContainer.getContainer().getExecutionType()); + assertEquals(orig.getResource(), + updatedContainer.getContainer().getResource()); + assertEquals(orig.getNodeId(), + updatedContainer.getContainer().getNodeId()); + assertEquals(orig.getVersion() + 1, + updatedContainer.getContainer().getVersion()); + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + amClient.ask.clear(); + } - amClient.registerApplicationMaster("Host", 10000, ""); + @Test(timeout = 60000) + public void testDemotionFromAcquired() throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); - testOpportunisticAllocation( - (AMRMClientImpl<AMRMClient.ContainerRequest>) amClient); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority3)); - testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient); + int guarContainersRequestedAny = amClient.getTable(0).get(priority3, + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + .remoteRequest.getNumContainers(); - amClient - .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, - null); + assertEquals(1, guarContainersRequestedAny); - } finally { - if (amClient != null && - amClient.getServiceState() == Service.STATE.STARTED) { - amClient.stop(); + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>(); + int iterationsLeft = 50; + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap<String, Token> receivedNMTokens = new HashMap<>(); + + updateMetrics("Before Guar Allocation"); + + while (allocatedContainerCount < guarContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + allocatedGuarContainers.put(container.getId(), container); + removeCR(container); + } + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < guarContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(guarContainersRequestedAny, allocatedContainerCount); + assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size()); + + updateMetrics("After Guar Allocation / Before Demotion"); + + try { + Container c = allocatedGuarContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED)); + Assert.fail("Should throw Exception.."); + } catch (IllegalArgumentException e) { + System.out.println("## " + e.getMessage()); + Assert.assertTrue(e.getMessage().contains( + "target should be OPPORTUNISTIC and original should be GUARANTEED")); + } + + Container c = allocatedGuarContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC)); + iterationsLeft = 120; + Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>(); + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + if (allocResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + System.out.println("Got update.."); + updatedContainers.put(updatedContainer.getContainer().getId(), + updatedContainer); + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); } } + + updateMetrics("After Demotion"); + + assertEquals(1, updatedContainers.size()); + for (ContainerId cId : allocatedGuarContainers.keySet()) { + Container orig = allocatedGuarContainers.get(cId); + UpdatedContainer updatedContainer = updatedContainers.get(cId); + assertNotNull(updatedContainer); + assertEquals(ExecutionType.OPPORTUNISTIC, + updatedContainer.getContainer().getExecutionType()); + assertEquals(orig.getResource(), + updatedContainer.getContainer().getResource()); + assertEquals(orig.getNodeId(), + updatedContainer.getContainer().getNodeId()); + assertEquals(orig.getVersion() + 1, + updatedContainer.getContainer().getVersion()); + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + amClient.ask.clear(); } - private void testAllocation( - final AMRMClientImpl<AMRMClient.ContainerRequest> amClient) - throws YarnException, IOException { + @Test(timeout = 60000) + public void testMixedAllocationAndRelease() throws YarnException, + IOException { // setup container request assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); @@ -274,6 +531,28 @@ public class TestOpportunisticContainerAllocation { ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); + int containersRequestedNode = amClient.getTable(0).get(priority, + node, ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + int containersRequestedRack = amClient.getTable(0).get(priority, + rack, ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + int containersRequestedAny = amClient.getTable(0).get(priority, + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + .remoteRequest.getNumContainers(); + int oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, capability).remoteRequest + .getNumContainers(); + + assertEquals(4, containersRequestedNode); + assertEquals(4, containersRequestedRack); + assertEquals(4, containersRequestedAny); + assertEquals(2, oppContainersRequestedAny); + + assertEquals(4, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + amClient.removeContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( @@ -284,16 +563,16 @@ public class TestOpportunisticContainerAllocation { ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); - int containersRequestedNode = amClient.getTable(0).get(priority, + containersRequestedNode = amClient.getTable(0).get(priority, node, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedRack = amClient.getTable(0).get(priority, + containersRequestedRack = amClient.getTable(0).get(priority, rack, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedAny = amClient.getTable(0).get(priority, + containersRequestedAny = amClient.getTable(0).get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); - int oppContainersRequestedAny = + oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); @@ -309,7 +588,7 @@ public class TestOpportunisticContainerAllocation { // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; int allocatedOpportContainerCount = 0; - int iterationsLeft = 10; + int iterationsLeft = 50; Set<ContainerId> releases = new TreeSet<>(); amClient.getNMTokenCache().clearCache(); @@ -324,8 +603,8 @@ public class TestOpportunisticContainerAllocation { assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); - allocatedContainerCount += allocResponse.getAllocatedContainers() - .size(); + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); for (Container container : allocResponse.getAllocatedContainers()) { if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { allocatedOpportContainerCount++; @@ -345,9 +624,9 @@ public class TestOpportunisticContainerAllocation { } } - assertEquals(allocatedContainerCount, - containersRequestedAny + oppContainersRequestedAny); - assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny); + assertEquals(containersRequestedAny + oppContainersRequestedAny, + allocatedContainerCount); + assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount); for (ContainerId rejectContainerId : releases) { amClient.releaseAssignedContainer(rejectContainerId); } @@ -395,26 +674,25 @@ public class TestOpportunisticContainerAllocation { /** * Tests allocation with requests comprising only opportunistic containers. */ - private void testOpportunisticAllocation( - final AMRMClientImpl<AMRMClient.ContainerRequest> amClient) - throws YarnException, IOException { + @Test(timeout = 60000) + public void testOpportunisticAllocation() throws YarnException, IOException { // setup container request assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + new AMRMClient.ContainerRequest(capability, null, null, priority3, 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + new AMRMClient.ContainerRequest(capability, null, null, priority3, 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); int oppContainersRequestedAny = - amClient.getTable(0).get(priority, ResourceRequest.ANY, + amClient.getTable(0).get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); @@ -456,9 +734,43 @@ public class TestOpportunisticContainerAllocation { } } + assertEquals(oppContainersRequestedAny, allocatedContainerCount); assertEquals(1, receivedNMTokens.values().size()); } + private void removeCR(Container container) { + List<? extends Collection<AMRMClient.ContainerRequest>> + matchingRequests = amClient.getMatchingRequests(container + .getPriority(), + ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, + container.getResource()); + Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>(); + for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) { + for (AMRMClient.ContainerRequest cr : rc) { + toRemove.add(cr); + } + } + for (AMRMClient.ContainerRequest cr : toRemove) { + amClient.removeContainerRequest(cr); + } + } + + private void updateMetrics(String msg) { + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler)yarnCluster.getResourceManager() + .getResourceScheduler(); + availMB = scheduler.getRootQueueMetrics().getAvailableMB(); + availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores(); + allocMB = scheduler.getRootQueueMetrics().getAllocatedMB(); + allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); + System.out.println("## METRICS (" + msg + ")==>"); + System.out.println(" : availMB=" + availMB + ", " + + "availVCores=" +availVCores + ", " + + "allocMB=" + allocMB + ", " + + "allocVCores=" + allocVCores + ", "); + System.out.println("<== ##"); + } + private void sleep(int sleepTime) { try { Thread.sleep(sleepTime); http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java index fb6c1a7..8ff9d9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java @@ -74,6 +74,22 @@ public class UpdateContainerErrorPBImpl extends UpdateContainerError { } @Override + public int getCurrentContainerVersion() { + YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasCurrentContainerVersion()) { + return 0; + } + return p.getCurrentContainerVersion(); + } + + @Override + public void setCurrentContainerVersion(int containerVersion) { + maybeInitBuilder(); + builder.setCurrentContainerVersion(containerVersion); + } + + @Override public UpdateContainerRequest getUpdateContainerRequest() { YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = viaProto ? proto : builder; http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 94bfd58..224a1da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -188,19 +188,25 @@ public class RMServerUtils { } } } - checkAndcreateUpdateError(updateErrors, updateReq, msg); + checkAndcreateUpdateError(updateErrors, updateReq, rmContainer, msg); } return updateRequests; } private static void checkAndcreateUpdateError( List<UpdateContainerError> errors, UpdateContainerRequest updateReq, - String msg) { + RMContainer rmContainer, String msg) { if (msg != null) { UpdateContainerError updateError = RECORD_FACTORY .newRecordInstance(UpdateContainerError.class); updateError.setReason(msg); updateError.setUpdateContainerRequest(updateReq); + if (rmContainer != null) { + updateError.setCurrentContainerVersion( + rmContainer.getContainer().getVersion()); + } else { + updateError.setCurrentContainerVersion(-1); + } errors.add(updateError); } } @@ -216,9 +222,7 @@ public class RMServerUtils { // version if (msg == null && updateReq.getContainerVersion() != rmContainer.getContainer().getVersion()) { - msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" - + updateReq.getContainerVersion() + "|" - + rmContainer.getContainer().getVersion(); + msg = INCORRECT_CONTAINER_VERSION_ERROR; } // No more than 1 container update per request. if (msg == null && http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 641ef64..b083642 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -251,8 +251,11 @@ public class TestOpportunisticContainerAllocatorAMService { Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); - Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0", + Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR", allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(0, + allocateResponse.getUpdateErrors().get(0) + .getCurrentContainerVersion()); Assert.assertEquals(container.getId(), allocateResponse.getUpdateErrors().get(0) .getUpdateContainerRequest().getContainerId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf106fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index c5829cf..74cecf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -275,8 +275,10 @@ public class TestIncreaseAllocationExpirer { Resources.createResource(5 * GB), null))); List<UpdateContainerError> updateErrors = response.getUpdateErrors(); Assert.assertEquals(1, updateErrors.size()); - Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|0|1", + Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR", updateErrors.get(0).getReason()); + Assert.assertEquals(1, + updateErrors.get(0).getCurrentContainerVersion()); // am1 asks to change containerId2 from 3GB to 5GB am1.sendContainerResizingRequest(Collections.singletonList( --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org