YARN-5587. Add support for resource profiles. (vvasudev via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c9e1ed84 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c9e1ed84 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c9e1ed84 Branch: refs/heads/YARN-3926 Commit: c9e1ed84bce3c78c359a1876280c0eac23dcc049 Parents: f01f86c Author: Arun Suresh <asur...@apache.org> Authored: Tue Nov 15 01:01:07 2016 -0800 Committer: Wangda Tan <wan...@apache.org> Committed: Mon Aug 21 16:52:55 2017 -0700 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 4 + .../RegisterApplicationMasterResponse.java | 8 + .../yarn/api/records/ProfileCapability.java | 94 ++++++++++- .../hadoop/yarn/api/records/Resource.java | 14 ++ .../yarn/api/records/ResourceInformation.java | 57 ++++++- .../yarn/api/records/ResourceRequest.java | 43 ++++- .../hadoop-yarn/hadoop-yarn-client/pom.xml | 1 + .../hadoop/yarn/client/api/AMRMClient.java | 117 +++++++++++++- .../yarn/client/api/impl/AMRMClientImpl.java | 152 ++++++++++------- .../client/api/impl/RemoteRequestsTable.java | 109 +++++++++---- .../yarn/client/api/impl/TestAMRMClient.java | 141 ++++++++++++++-- .../impl/TestAMRMClientContainerRequest.java | 8 +- .../api/impl/TestDistributedScheduling.java | 12 +- .../yarn/client/api/impl/TestNMClient.java | 5 +- .../TestOpportunisticContainerAllocation.java | 31 ++-- .../src/test/resources/resource-profiles.json | 18 +++ ...RegisterApplicationMasterResponsePBImpl.java | 58 +++++++ .../api/records/impl/pb/ResourcePBImpl.java | 4 +- .../records/impl/pb/ResourceRequestPBImpl.java | 41 ++++- .../yarn/util/resource/ResourceUtils.java | 161 ++++++++++++++++++- .../hadoop/yarn/util/resource/Resources.java | 10 +- .../ApplicationMasterService.java | 1 + .../resourcemanager/DefaultAMSProcessor.java | 8 + .../server/resourcemanager/RMServerUtils.java | 50 ++++++ .../resource/ResourceProfilesManagerImpl.java | 4 + .../scheduler/AbstractYarnScheduler.java | 44 +++++ .../scheduler/ClusterNodeTracker.java | 3 +- .../scheduler/SchedulerUtils.java | 10 ++ .../scheduler/capacity/CapacityScheduler.java | 4 +- .../scheduler/fair/FairScheduler.java | 4 +- .../scheduler/fifo/FifoScheduler.java | 13 +- .../yarn/server/resourcemanager/MockRM.java | 2 + .../server/resourcemanager/TestAppManager.java | 1 + .../TestApplicationMasterService.java | 35 ++++ .../scheduler/fair/TestFairScheduler.java | 4 + .../hadoop/yarn/server/MiniYARNCluster.java | 2 + 36 files changed, 1100 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 6825a36..ce7a9c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -154,6 +154,10 @@ <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> </Match> <Match> + <Class name="org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl$ProfileCapabilityComparator" /> + <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> + </Match> + <Match> <Class name="org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl" /> <Field name="builder" /> <Bug pattern="SE_BAD_FIELD" /> http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 0b886dd..8fa8563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -204,4 +204,12 @@ public abstract class RegisterApplicationMasterResponse { @Unstable public abstract void setSchedulerResourceTypes( EnumSet<SchedulerResourceTypes> types); + + @Public + @Unstable + public abstract Map<String, Resource> getResourceProfiles(); + + @Private + @Unstable + public abstract void setResourceProfiles(Map<String, Resource> profiles); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java index 0a93b89..faaddd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java @@ -18,41 +18,93 @@ package org.apache.hadoop.yarn.api.records; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.util.Records; +import java.util.Map; + /** * Class to capture capability requirements when using resource profiles. The * ProfileCapability is meant to be used as part of the ResourceRequest. A * profile capability has two pieces - the resource profile name and the * overrides. The resource profile specifies the name of the resource profile * to be used and the capability override is the overrides desired on specific - * resource types. For example, you could use the "minimum" profile and set the - * memory in the capability override to 4096M. This implies that you wish for - * the resources specified in the "minimum" profile but with 4096M memory. The - * conversion from the ProfileCapability to the Resource class with the actual - * resource requirements will be done by the ResourceManager, which has the - * actual profile to Resource mapping. + * resource types. + * + * For example, if you have a resource profile "small" that maps to + * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to + * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on + * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}. + * + * Note that the conversion from the ProfileCapability to the Resource class + * with the actual resource requirements will be done by the ResourceManager, + * which has the actual profile to Resource mapping. + * */ @InterfaceAudience.Public @InterfaceStability.Unstable public abstract class ProfileCapability { + public static final String DEFAULT_PROFILE = "default"; + + public static ProfileCapability newInstance(Resource override) { + return newInstance(DEFAULT_PROFILE, override); + } + + public static ProfileCapability newInstance(String profile) { + Preconditions + .checkArgument(profile != null, "The profile name cannot be null"); + ProfileCapability obj = Records.newRecord(ProfileCapability.class); + obj.setProfileName(profile); + obj.setProfileCapabilityOverride(Resource.newInstance(0, 0)); + return obj; + } + public static ProfileCapability newInstance(String profile, Resource override) { + Preconditions + .checkArgument(profile != null, "The profile name cannot be null"); ProfileCapability obj = Records.newRecord(ProfileCapability.class); obj.setProfileName(profile); obj.setProfileCapabilityOverride(override); return obj; } + /** + * Get the profile name. + * @return the profile name + */ public abstract String getProfileName(); + /** + * Get the profile capability override. + * @return Resource object containing the override. + */ public abstract Resource getProfileCapabilityOverride(); + /** + * Set the resource profile name. + * @param profileName the resource profile name + */ public abstract void setProfileName(String profileName); + /** + * Set the capability override to override specific resource types on the + * resource profile. + * + * For example, if you have a resource profile "small" that maps to + * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to + * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on + * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}. + * + * Note that the conversion from the ProfileCapability to the Resource class + * with the actual resource requirements will be done by the ResourceManager, + * which has the actual profile to Resource mapping. + * + * @param r Resource object containing the capability override + */ public abstract void setProfileCapabilityOverride(Resource r); @Override @@ -85,4 +137,34 @@ public abstract class ProfileCapability { return "{ profile: " + this.getProfileName() + ", capabilityOverride: " + this.getProfileCapabilityOverride() + " }"; } + + /** + * Get a representation of the capability as a Resource object. + * @param capability the capability we wish to convert + * @param resourceProfilesMap map of profile name to Resource object + * @return Resource object representing the capability + */ + public static Resource toResource(ProfileCapability capability, + Map<String, Resource> resourceProfilesMap) { + Preconditions + .checkArgument(capability != null, "Capability cannot be null"); + Preconditions.checkArgument(resourceProfilesMap != null, + "Resource profiles map cannot be null"); + Resource resource = Resource.newInstance(0, 0); + + if (resourceProfilesMap.containsKey(capability.getProfileName())) { + resource = Resource + .newInstance(resourceProfilesMap.get(capability.getProfileName())); + } + + if(capability.getProfileCapabilityOverride()!= null) { + for (Map.Entry<String, ResourceInformation> entry : capability + .getProfileCapabilityOverride().getResources().entrySet()) { + if (entry.getValue() != null && entry.getValue().getValue() != 0) { + resource.setResourceInformation(entry.getKey(), entry.getValue()); + } + } + } + return resource; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 507247e..c349a32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.api.records; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -101,6 +103,18 @@ public abstract class Resource implements Comparable<Resource> { return new SimpleResource(memory, vCores); } + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static Resource newInstance(Resource resource) { + Resource ret = Resource.newInstance(0, 0); + for (Map.Entry<String, ResourceInformation> entry : resource.getResources() + .entrySet()) { + ret.setResourceInformation(entry.getKey(), + ResourceInformation.newInstance(entry.getValue())); + } + return ret; + } + /** * This method is DEPRECATED: * Use {@link Resource#getMemorySize()} instead http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index a17e81b..7d74efc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -31,6 +31,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> { private String units; private ResourceTypes resourceType; private Long value; + private Long minimumAllocation; + private Long maximumAllocation; private static final String MEMORY_URI = "memory-mb"; private static final String VCORES_URI = "vcores"; @@ -118,6 +120,42 @@ public class ResourceInformation implements Comparable<ResourceInformation> { } /** + * Get the minimum allocation for the resource. + * + * @return the minimum allocation for the resource + */ + public Long getMinimumAllocation() { + return minimumAllocation; + } + + /** + * Set the minimum allocation for the resource. + * + * @param minimumAllocation the minimum allocation for the resource + */ + public void setMinimumAllocation(Long minimumAllocation) { + this.minimumAllocation = minimumAllocation; + } + + /** + * Get the maximum allocation for the resource. + * + * @return the maximum allocation for the resource + */ + public Long getMaximumAllocation() { + return maximumAllocation; + } + + /** + * Set the maximum allocation for the resource. + * + * @param maximumAllocation the maximum allocation for the resource + */ + public void setMaximumAllocation(Long maximumAllocation) { + this.maximumAllocation = maximumAllocation; + } + + /** * Create a new instance of ResourceInformation from another object. * * @param other the object from which the new object should be created @@ -129,33 +167,41 @@ public class ResourceInformation implements Comparable<ResourceInformation> { ret.setResourceType(other.getResourceType()); ret.setUnits(other.getUnits()); ret.setValue(other.getValue()); + ret.setMinimumAllocation(other.getMinimumAllocation()); + ret.setMaximumAllocation(other.getMaximumAllocation()); return ret; } public static ResourceInformation newInstance(String name, String units, - Long value, ResourceTypes type) { + Long value, ResourceTypes type, Long minimumAllocation, + Long maximumAllocation) { ResourceInformation ret = new ResourceInformation(); ret.setName(name); ret.setResourceType(type); ret.setUnits(units); ret.setValue(value); + ret.setMinimumAllocation(minimumAllocation); + ret.setMaximumAllocation(maximumAllocation); return ret; } public static ResourceInformation newInstance(String name, String units, Long value) { return ResourceInformation - .newInstance(name, units, value, ResourceTypes.COUNTABLE); + .newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L, + Long.MAX_VALUE); } public static ResourceInformation newInstance(String name, String units) { return ResourceInformation - .newInstance(name, units, 0L, ResourceTypes.COUNTABLE); + .newInstance(name, units, 0L, ResourceTypes.COUNTABLE, 0L, + Long.MAX_VALUE); } public static ResourceInformation newInstance(String name, Long value) { return ResourceInformation - .newInstance(name, "", value, ResourceTypes.COUNTABLE); + .newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L, + Long.MAX_VALUE); } public static ResourceInformation newInstance(String name) { @@ -165,7 +211,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> { @Override public String toString() { return "name: " + this.name + ", units: " + this.units + ", type: " - + resourceType + ", value: " + value; + + resourceType + ", value: " + value + ", minimum allocation: " + + minimumAllocation + ", maximum allocation: " + maximumAllocation; } public String getShorthandRepresentation() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 5bedc87..c1339b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -98,7 +98,22 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { .resourceName(hostName).capability(capability) .numContainers(numContainers).relaxLocality(relaxLocality) .nodeLabelExpression(labelExpression) - .executionTypeRequest(executionTypeRequest).build(); + .executionTypeRequest(executionTypeRequest).profileCapability(null) + .build(); + } + + @Public + @Evolving + public static ResourceRequest newInstance(Priority priority, String hostName, + Resource capability, int numContainers, boolean relaxLocality, + String labelExpression, ExecutionTypeRequest executionTypeRequest, + ProfileCapability profile) { + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).relaxLocality(relaxLocality) + .nodeLabelExpression(labelExpression) + .executionTypeRequest(executionTypeRequest).profileCapability(profile) + .build(); } @Public @@ -124,6 +139,7 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { resourceRequest.setRelaxLocality(true); resourceRequest.setExecutionTypeRequest( ExecutionTypeRequest.newInstance()); + resourceRequest.setProfileCapability(null); } /** @@ -238,6 +254,21 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { } /** + * Set the <code>resourceProfile</code> of the request. + * @see ResourceRequest#setProfileCapability(ProfileCapability) + * @param profileCapability + * <code>profileCapability</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder profileCapability( + ProfileCapability profileCapability) { + resourceRequest.setProfileCapability(profileCapability); + return this; + } + + /** * Return generated {@link ResourceRequest} object. * @return {@link ResourceRequest} */ @@ -454,6 +485,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { @Evolving public abstract void setNodeLabelExpression(String nodelabelExpression); + @Public + @Evolving + public abstract ProfileCapability getProfileCapability(); + + @Public + @Evolving + public abstract void setProfileCapability(ProfileCapability p); + /** * Get the optional <em>ID</em> corresponding to this allocation request. This * ID is an identifier for different {@code ResourceRequest}s from the <b>same @@ -529,12 +568,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { Resource capability = getCapability(); String hostName = getResourceName(); Priority priority = getPriority(); + ProfileCapability profile = getProfileCapability(); result = prime * result + ((capability == null) ? 0 : capability.hashCode()); result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); result = prime * result + getNumContainers(); result = prime * result + ((priority == null) ? 0 : priority.hashCode()); result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode(); + result = prime * result + ((profile == null) ? 0 : profile.hashCode()); return result; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index b83bff8..8cbf4c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -144,6 +144,7 @@ <exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude> + <exclude>src/test/resources/resource-profiles.json</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 69f3777..a11275b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; @@ -125,6 +126,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends private boolean relaxLocality; private String nodeLabelsExpression; private ExecutionTypeRequest executionTypeRequest; + private String resourceProfile; /** * Instantiates a {@link ContainerRequest} with the given constraints and @@ -171,6 +173,26 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends this(capability, nodes, racks, priority, allocationRequestId, true, null, ExecutionTypeRequest.newInstance()); } + /** + * Instantiates a {@link ContainerRequest} with the given constraints and + * locality relaxation enabled. + * + * @param capability + * The {@link ProfileCapability} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + */ + public ContainerRequest(ProfileCapability capability, String[] nodes, + String[] racks, Priority priority) { + this(capability, nodes, racks, priority, 0, true, null); + } /** * Instantiates a {@link ContainerRequest} with the given constraints. @@ -199,6 +221,29 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends * Instantiates a {@link ContainerRequest} with the given constraints. * * @param capability + * The {@link ProfileCapability} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + */ + public ContainerRequest(ProfileCapability capability, String[] nodes, + String[] racks, Priority priority, boolean relaxLocality) { + this(capability, nodes, racks, priority, 0, relaxLocality, null); + } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability * The {@link Resource} to be requested for each container. * @param nodes * Any hosts to request that the containers are placed on. @@ -285,6 +330,59 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends relaxLocality, nodeLabelsExpression, ExecutionTypeRequest.newInstance()); } + + public ContainerRequest(ProfileCapability capability, String[] nodes, + String[] racks, Priority priority, long allocationRequestId, + boolean relaxLocality, String nodeLabelsExpression) { + this(capability, nodes, racks, priority, allocationRequestId, + relaxLocality, nodeLabelsExpression, + ExecutionTypeRequest.newInstance()); + } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param allocationRequestId + * The allocationRequestId of the request. To be used as a tracking + * id to match Containers allocated against this request. Will + * default to 0 if not specified. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + * @param nodeLabelsExpression + * Set node labels to allocate resource, now we only support + * asking for only a single node label + * @param executionTypeRequest + * Set the execution type of the container request. + */ + public ContainerRequest(Resource capability, String[] nodes, String[] racks, + Priority priority, long allocationRequestId, boolean relaxLocality, + String nodeLabelsExpression, + ExecutionTypeRequest executionTypeRequest) { + this(capability, nodes, racks, priority, allocationRequestId, + relaxLocality, nodeLabelsExpression, executionTypeRequest, + ProfileCapability.DEFAULT_PROFILE); + } + + public ContainerRequest(ProfileCapability capability, String[] nodes, + String[] racks, Priority priority, long allocationRequestId, + boolean relaxLocality, String nodeLabelsExpression, + ExecutionTypeRequest executionTypeRequest) { + this(capability.getProfileCapabilityOverride(), nodes, racks, priority, + allocationRequestId, relaxLocality, nodeLabelsExpression, + executionTypeRequest, capability.getProfileName()); + } /** * Instantiates a {@link ContainerRequest} with the given constraints. @@ -312,11 +410,13 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends * asking for only a single node label * @param executionTypeRequest * Set the execution type of the container request. + * @param profile + * Set the resource profile for the container request */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, long allocationRequestId, boolean relaxLocality, String nodeLabelsExpression, - ExecutionTypeRequest executionTypeRequest) { + ExecutionTypeRequest executionTypeRequest, String profile) { this.allocationRequestId = allocationRequestId; this.capability = capability; this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); @@ -325,6 +425,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends this.relaxLocality = relaxLocality; this.nodeLabelsExpression = nodeLabelsExpression; this.executionTypeRequest = executionTypeRequest; + this.resourceProfile = profile; sanityCheck(); } @@ -376,6 +477,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends return executionTypeRequest; } + public String getResourceProfile() { + return resourceProfile; + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); @@ -383,6 +488,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends sb.append("AllocationRequestId[").append(allocationRequestId).append("]"); sb.append("ExecutionTypeRequest[").append(executionTypeRequest) .append("]"); + sb.append("Resource Profile[").append(resourceProfile).append("]"); return sb.toString(); } @@ -635,6 +741,15 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends " AMRMClient is expected to implement this !!"); } + + @InterfaceStability.Evolving + public List<? extends Collection<T>> getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + ProfileCapability capability) { + throw new UnsupportedOperationException("The sub-class extending" + + " AMRMClient is expected to implement this !!"); + } + /** * Get outstanding <code>ContainerRequest</code>s matching the given * allocationRequestId. These ContainerRequests should have been added via http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 7a21bc6..8e66c20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -105,56 +106,56 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { protected final Set<String> blacklistedNodes = new HashSet<String>(); protected final Set<String> blacklistAdditions = new HashSet<String>(); protected final Set<String> blacklistRemovals = new HashSet<String>(); + + protected Map<String, Resource> resourceProfilesMap; static class ResourceRequestInfo<T> { ResourceRequest remoteRequest; LinkedHashSet<T> containerRequests; - + ResourceRequestInfo(Long allocationRequestId, Priority priority, - String resourceName, Resource capability, boolean relaxLocality) { + String resourceName, Resource capability, boolean relaxLocality, + String resourceProfile) { + ProfileCapability profileCapability = ProfileCapability + .newInstance(resourceProfile, capability); remoteRequest = ResourceRequest.newBuilder().priority(priority) .resourceName(resourceName).capability(capability).numContainers(0) - .allocationRequestId(allocationRequestId) - .relaxLocality(relaxLocality).build(); + .allocationRequestId(allocationRequestId).relaxLocality(relaxLocality) + .profileCapability(profileCapability).build(); containerRequests = new LinkedHashSet<T>(); } } /** - * Class compares Resource by memory then cpu in reverse order + * Class compares Resource by memory, then cpu and then the remaining resource + * types in reverse order. */ - static class ResourceReverseMemoryThenCpuComparator implements - Comparator<Resource>, Serializable { - static final long serialVersionUID = 12345L; - @Override - public int compare(Resource arg0, Resource arg1) { - long mem0 = arg0.getMemorySize(); - long mem1 = arg1.getMemorySize(); - long cpu0 = arg0.getVirtualCores(); - long cpu1 = arg1.getVirtualCores(); - if(mem0 == mem1) { - if(cpu0 == cpu1) { - return 0; - } - if(cpu0 < cpu1) { - return 1; - } - return -1; - } - if(mem0 < mem1) { - return 1; - } - return -1; - } + static class ProfileCapabilityComparator<T extends ProfileCapability> + implements Comparator<T> { + + HashMap<String, Resource> resourceProfilesMap; + + public ProfileCapabilityComparator( + HashMap<String, Resource> resourceProfileMap) { + this.resourceProfilesMap = resourceProfileMap; + } + + public int compare(T arg0, T arg1) { + Resource resource0 = + ProfileCapability.toResource(arg0, resourceProfilesMap); + Resource resource1 = + ProfileCapability.toResource(arg1, resourceProfilesMap); + return resource1.compareTo(resource0); + } } - static boolean canFit(Resource arg0, Resource arg1) { - long mem0 = arg0.getMemorySize(); - long mem1 = arg1.getMemorySize(); - long cpu0 = arg0.getVirtualCores(); - long cpu1 = arg1.getVirtualCores(); - - return (mem0 <= mem1 && cpu0 <= cpu1); + boolean canFit(ProfileCapability arg0, ProfileCapability arg1) { + Resource resource0 = + ProfileCapability.toResource(arg0, resourceProfilesMap); + Resource resource1 = + ProfileCapability.toResource(arg1, resourceProfilesMap); + return Resources.fitsIn(resource0, resource1); + } private final Map<Long, RemoteRequestsTable<T>> remoteRequests = @@ -233,6 +234,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { return registerApplicationMaster(); } + @SuppressWarnings("unchecked") private RegisterApplicationMasterResponse registerApplicationMaster() throws YarnException, IOException { RegisterApplicationMasterRequest request = @@ -245,6 +247,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { if (!response.getNMTokensFromPreviousAttempts().isEmpty()) { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } + this.resourceProfilesMap = response.getResourceProfiles(); } return response; } @@ -416,13 +419,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { for(ResourceRequest r : ask) { // create a copy of ResourceRequest as we might change it while the // RPC layer is using it to send info across - ResourceRequest rr = ResourceRequest.newBuilder() - .priority(r.getPriority()).resourceName(r.getResourceName()) - .capability(r.getCapability()).numContainers(r.getNumContainers()) - .relaxLocality(r.getRelaxLocality()) - .nodeLabelExpression(r.getNodeLabelExpression()) - .executionTypeRequest(r.getExecutionTypeRequest()) - .allocationRequestId(r.getAllocationRequestId()).build(); + ResourceRequest rr = + ResourceRequest.newBuilder().priority(r.getPriority()) + .resourceName(r.getResourceName()).capability(r.getCapability()) + .numContainers(r.getNumContainers()) + .relaxLocality(r.getRelaxLocality()) + .nodeLabelExpression(r.getNodeLabelExpression()) + .executionTypeRequest(r.getExecutionTypeRequest()) + .allocationRequestId(r.getAllocationRequestId()) + .profileCapability(r.getProfileCapability()).build(); askList.add(rr); } return askList; @@ -504,6 +509,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { public synchronized void addContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); + ProfileCapability profileCapability = ProfileCapability + .newInstance(req.getResourceProfile(), req.getCapability()); Set<String> dedupedRacks = new HashSet<String>(); if (req.getRacks() != null) { dedupedRacks.addAll(req.getRacks()); @@ -516,6 +523,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { Set<String> inferredRacks = resolveRacks(req.getNodes()); inferredRacks.removeAll(dedupedRacks); + checkResourceProfile(req.getResourceProfile()); + // check that specific and non-specific requests cannot be mixed within a // priority checkLocalityRelaxationConflict(req.getAllocationRequestId(), @@ -540,26 +549,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } for (String node : dedupedNodes) { addResourceRequest(req.getPriority(), node, - req.getExecutionTypeRequest(), req.getCapability(), req, true, + req.getExecutionTypeRequest(), profileCapability, req, true, req.getNodeLabelExpression()); } } for (String rack : dedupedRacks) { addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), - req.getCapability(), req, true, req.getNodeLabelExpression()); + profileCapability, req, true, req.getNodeLabelExpression()); } // Ensure node requests are accompanied by requests for // corresponding rack for (String rack : inferredRacks) { addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), - req.getCapability(), req, req.getRelaxLocality(), + profileCapability, req, req.getRelaxLocality(), req.getNodeLabelExpression()); } // Off-switch addResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getExecutionTypeRequest(), req.getCapability(), req, + req.getExecutionTypeRequest(), profileCapability, req, req.getRelaxLocality(), req.getNodeLabelExpression()); } @@ -567,6 +576,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { public synchronized void removeContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); + ProfileCapability profileCapability = ProfileCapability + .newInstance(req.getResourceProfile(), req.getCapability()); Set<String> allRacks = new HashSet<String>(); if (req.getRacks() != null) { allRacks.addAll(req.getRacks()); @@ -577,17 +588,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { if (req.getNodes() != null) { for (String node : new HashSet<String>(req.getNodes())) { decResourceRequest(req.getPriority(), node, - req.getExecutionTypeRequest(), req.getCapability(), req); + req.getExecutionTypeRequest(), profileCapability, req); } } for (String rack : allRacks) { decResourceRequest(req.getPriority(), rack, - req.getExecutionTypeRequest(), req.getCapability(), req); + req.getExecutionTypeRequest(), profileCapability, req); } decResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getExecutionTypeRequest(), req.getCapability(), req); + req.getExecutionTypeRequest(), profileCapability, req); } @Override @@ -686,6 +697,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { public synchronized List<? extends Collection<T>> getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, Resource capability) { + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); + return getMatchingRequests(priority, resourceName, executionType, + profileCapability); + } + + @Override + @SuppressWarnings("unchecked") + public synchronized List<? extends Collection<T>> getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + ProfileCapability capability) { Preconditions.checkArgument(capability != null, "The Resource to be requested should not be null "); Preconditions.checkArgument(priority != null, @@ -695,22 +717,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { RemoteRequestsTable remoteRequestsTable = getTable(0); if (null != remoteRequestsTable) { - List<ResourceRequestInfo<T>> matchingRequests = - remoteRequestsTable.getMatchingRequests(priority, resourceName, - executionType, capability); + List<ResourceRequestInfo<T>> matchingRequests = remoteRequestsTable + .getMatchingRequests(priority, resourceName, executionType, + capability); if (null != matchingRequests) { // If no exact match. Container may be larger than what was requested. // get all resources <= capability. map is reverse sorted. for (ResourceRequestInfo<T> resReqInfo : matchingRequests) { - if (canFit(resReqInfo.remoteRequest.getCapability(), capability) && - !resReqInfo.containerRequests.isEmpty()) { + if (canFit(resReqInfo.remoteRequest.getProfileCapability(), + capability) && !resReqInfo.containerRequests.isEmpty()) { list.add(resReqInfo.containerRequests); } } } } // no match found - return list; + return list; } private Set<String> resolveRacks(List<String> nodes) { @@ -758,6 +780,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } } } + + private void checkResourceProfile(String profile) { + if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty() + && !resourceProfilesMap.containsKey(profile)) { + throw new InvalidContainerRequestException( + "Invalid profile name, valid profile names are " + resourceProfilesMap + .keySet()); + } + } /** * Valid if a node label expression specified on container request is valid or @@ -845,12 +876,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } private void addResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, Resource capability, T req, + ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req, boolean relaxLocality, String labelExpression) { RemoteRequestsTable<T> remoteRequestsTable = getTable(req.getAllocationRequestId()); if (remoteRequestsTable == null) { remoteRequestsTable = new RemoteRequestsTable<T>(); + if (this.resourceProfilesMap instanceof HashMap) { + remoteRequestsTable.setResourceComparator( + new ProfileCapabilityComparator((HashMap) resourceProfilesMap)); + } putTable(req.getAllocationRequestId(), remoteRequestsTable); } @SuppressWarnings("unchecked") @@ -863,6 +898,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { addResourceRequestToAsk(resourceRequestInfo.remoteRequest); if (LOG.isDebugEnabled()) { + LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest); LOG.debug("addResourceRequest:" + " applicationId=" + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" @@ -872,7 +908,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } private void decResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, Resource capability, T req) { + ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) { RemoteRequestsTable<T> remoteRequestsTable = getTable(req.getAllocationRequestId()); if (remoteRequestsTable != null) { @@ -882,7 +918,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { execTypeReq, capability, req); // send the ResourceRequest to RM even if is 0 because it needs to // override a previously sent value. If ResourceRequest was not sent - // previously then sending 0 ought to be a no-op on RM + // previously then sending 0 aught to be a no-op on RM if (resourceRequestInfo != null) { addResourceRequestToAsk(resourceRequestInfo.remoteRequest); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java index 110ca79..135e1db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import java.util.Collection; import java.util.HashMap; @@ -35,43 +35,42 @@ import java.util.TreeMap; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo; -import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ProfileCapabilityComparator; class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class); - static ResourceReverseMemoryThenCpuComparator resourceComparator = - new ResourceReverseMemoryThenCpuComparator(); + private ProfileCapabilityComparator resourceComparator; /** * Nested Iterator that iterates over just the ResourceRequestInfo * object. */ class RequestInfoIterator implements Iterator<ResourceRequestInfo> { - private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource, + private Iterator<Map<String, Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>> iLocMap; - private Iterator<Map<ExecutionType, TreeMap<Resource, + private Iterator<Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>> iExecTypeMap; - private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap; + private Iterator<TreeMap<ProfileCapability, ResourceRequestInfo>> iCapMap; private Iterator<ResourceRequestInfo> iResReqInfo; public RequestInfoIterator(Iterator<Map<String, - Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>> + Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>> iLocationMap) { this.iLocMap = iLocationMap; if (iLocMap.hasNext()) { iExecTypeMap = iLocMap.next().values().iterator(); } else { iExecTypeMap = - new LinkedList<Map<ExecutionType, TreeMap<Resource, + new LinkedList<Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>().iterator(); } if (iExecTypeMap.hasNext()) { iCapMap = iExecTypeMap.next().values().iterator(); } else { iCapMap = - new LinkedList<TreeMap<Resource, ResourceRequestInfo>>() + new LinkedList<TreeMap<ProfileCapability, ResourceRequestInfo>>() .iterator(); } if (iCapMap.hasNext()) { @@ -113,7 +112,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ // Nest map with Primary key : // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource) // and value : ResourceRequestInfo - private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource, + private Map<Priority, Map<String, Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>(); @Override @@ -122,8 +121,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ } ResourceRequestInfo get(Priority priority, String location, - ExecutionType execType, Resource capability) { - TreeMap<Resource, ResourceRequestInfo> capabilityMap = + ExecutionType execType, ProfileCapability capability) { + TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap = getCapabilityMap(priority, location, execType); if (capabilityMap == null) { return null; @@ -131,9 +130,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ return capabilityMap.get(capability); } + @SuppressWarnings("unchecked") void put(Priority priority, String resourceName, ExecutionType execType, - Resource capability, ResourceRequestInfo resReqInfo) { - Map<String, Map<ExecutionType, TreeMap<Resource, + ProfileCapability capability, ResourceRequestInfo resReqInfo) { + Map<String, Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority); if (locationMap == null) { @@ -143,8 +143,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ LOG.debug("Added priority=" + priority); } } - Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap = - locationMap.get(resourceName); + Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>> + execTypeMap = locationMap.get(resourceName); if (execTypeMap == null) { execTypeMap = new HashMap<>(); locationMap.put(resourceName, execTypeMap); @@ -152,9 +152,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ LOG.debug("Added resourceName=" + resourceName); } } - TreeMap<Resource, ResourceRequestInfo> capabilityMap = + TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap = execTypeMap.get(execType); if (capabilityMap == null) { + // this can happen if the user doesn't register with the RM before + // calling addResourceRequest + if (resourceComparator == null) { + resourceComparator = new ProfileCapabilityComparator(new HashMap<>()); + } capabilityMap = new TreeMap<>(resourceComparator); execTypeMap.put(execType, capabilityMap); if (LOG.isDebugEnabled()) { @@ -165,9 +170,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ } ResourceRequestInfo remove(Priority priority, String resourceName, - ExecutionType execType, Resource capability) { + ExecutionType execType, ProfileCapability capability) { ResourceRequestInfo retVal = null; - Map<String, Map<ExecutionType, TreeMap<Resource, + Map<String, Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority); if (locationMap == null) { if (LOG.isDebugEnabled()) { @@ -175,7 +180,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ } return null; } - Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> + Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>> execTypeMap = locationMap.get(resourceName); if (execTypeMap == null) { if (LOG.isDebugEnabled()) { @@ -183,7 +188,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ } return null; } - TreeMap<Resource, ResourceRequestInfo> capabilityMap = + TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap = execTypeMap.get(execType); if (capabilityMap == null) { if (LOG.isDebugEnabled()) { @@ -204,14 +209,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ return retVal; } - Map<String, Map<ExecutionType, TreeMap<Resource, + Map<String, Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>> getLocationMap(Priority priority) { return remoteRequestsTable.get(priority); } - Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> + Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>> getExecutionTypeMap(Priority priority, String location) { - Map<String, Map<ExecutionType, TreeMap<Resource, + Map<String, Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>> locationMap = getLocationMap(priority); if (locationMap == null) { return null; @@ -219,10 +224,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ return locationMap.get(location); } - TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority + TreeMap<ProfileCapability, ResourceRequestInfo> getCapabilityMap(Priority priority, String location, ExecutionType execType) { - Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> + Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>> executionTypeMap = getExecutionTypeMap(priority, location); if (executionTypeMap == null) { return null; @@ -236,7 +241,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ List retList = new LinkedList<>(); for (String location : locations) { for (ExecutionType eType : ExecutionType.values()) { - TreeMap<Resource, ResourceRequestInfo> capabilityMap = + TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap = getCapabilityMap(priority, location, eType); if (capabilityMap != null) { retList.addAll(capabilityMap.values()); @@ -248,9 +253,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ List<ResourceRequestInfo> getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, - Resource capability) { + ProfileCapability capability) { List<ResourceRequestInfo> list = new LinkedList<>(); - TreeMap<Resource, ResourceRequestInfo> capabilityMap = + TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap = getCapabilityMap(priority, resourceName, executionType); if (capabilityMap != null) { ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability); @@ -266,14 +271,15 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ @SuppressWarnings("unchecked") ResourceRequestInfo addResourceRequest(Long allocationRequestId, Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, - Resource capability, T req, boolean relaxLocality, + ProfileCapability capability, T req, boolean relaxLocality, String labelExpression) { - ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, - execTypeReq.getExecutionType(), capability); + ResourceRequestInfo resourceRequestInfo = + get(priority, resourceName, execTypeReq.getExecutionType(), capability); if (resourceRequestInfo == null) { resourceRequestInfo = new ResourceRequestInfo(allocationRequestId, priority, resourceName, - capability, relaxLocality); + capability.getProfileCapabilityOverride(), relaxLocality, + capability.getProfileName()); put(priority, resourceName, execTypeReq.getExecutionType(), capability, resourceRequestInfo); } @@ -288,11 +294,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ if (ResourceRequest.ANY.equals(resourceName)) { resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression); } + if (LOG.isDebugEnabled()) { + LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest); + } return resourceRequestInfo; } ResourceRequestInfo decResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, Resource capability, T req) { + ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) { ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, execTypeReq.getExecutionType(), capability); @@ -330,4 +339,34 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{ return remoteRequestsTable.isEmpty(); } + @SuppressWarnings("unchecked") + public void setResourceComparator(ProfileCapabilityComparator comparator) { + ProfileCapabilityComparator old = this.resourceComparator; + this.resourceComparator = comparator; + if (old != null) { + // we've already set a resource comparator - re-create the maps with the + // new one. this is needed in case someone adds container requests before + // registering with the RM. In such a case, the comparator won't have + // the resource profiles map. After registration, the map is available + // so re-create the capabilities maps + + for (Map.Entry<Priority, Map<String, Map<ExecutionType, + TreeMap<ProfileCapability, ResourceRequestInfo>>>> + priEntry : remoteRequestsTable.entrySet()) { + for (Map.Entry<String, Map<ExecutionType, TreeMap<ProfileCapability, + ResourceRequestInfo>>> nameEntry : priEntry.getValue().entrySet()) { + for (Map.Entry<ExecutionType, TreeMap<ProfileCapability, + ResourceRequestInfo>> execEntry : nameEntry + .getValue().entrySet()) { + Map<ProfileCapability, ResourceRequestInfo> capabilityMap = + execEntry.getValue(); + TreeMap<ProfileCapability, ResourceRequestInfo> newCapabiltyMap = + new TreeMap<>(resourceComparator); + newCapabiltyMap.putAll(capabilityMap); + execEntry.setValue(newCapabiltyMap); + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 09b12f2..662271a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -130,11 +130,13 @@ public class TestAMRMClient { @Before public void setup() throws Exception { conf = new YarnConfiguration(); - createClusterAndStartApplication(); + createClusterAndStartApplication(conf); } - private void createClusterAndStartApplication() throws Exception { + private void createClusterAndStartApplication(Configuration conf) + throws Exception { // start minicluster + this.conf = conf; conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); conf.setLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, @@ -536,7 +538,8 @@ public class TestAMRMClient { } @Test (timeout=60000) - public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException { + public void testAMRMClientMatchingFitInferredRack() + throws YarnException, IOException { AMRMClientImpl<ContainerRequest> amClient = null; try { // start am rm client @@ -544,10 +547,10 @@ public class TestAMRMClient { amClient.init(conf); amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); - + Resource capability = Resource.newInstance(1024, 2); - ContainerRequest storedContainer1 = + ContainerRequest storedContainer1 = new ContainerRequest(capability, nodes, null, priority); amClient.addContainerRequest(storedContainer1); @@ -564,14 +567,15 @@ public class TestAMRMClient { verifyMatches(matches, 1); storedRequest = matches.get(0).iterator().next(); assertEquals(storedContainer1, storedRequest); - + // inferred rack match no longer valid after request is removed amClient.removeContainerRequest(storedContainer1); matches = amClient.getMatchingRequests(priority, rack, capability); assertTrue(matches.isEmpty()); - - amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - null, null); + + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); } finally { if (amClient != null && amClient.getServiceState() == STATE.STARTED) { @@ -604,16 +608,19 @@ public class TestAMRMClient { amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer2); amClient.addContainerRequest(storedContainer3); + + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); // test addition and storage RemoteRequestsTable<ContainerRequest> remoteRequestsTable = amClient.getTable(0); int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedAny); containersRequestedAny = remoteRequestsTable.get(priority1, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); assertEquals(1, containersRequestedAny); List<? extends Collection<ContainerRequest>> matches = @@ -884,7 +891,7 @@ public class TestAMRMClient { teardown(); conf = new YarnConfiguration(); conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, "privacy"); - createClusterAndStartApplication(); + createClusterAndStartApplication(conf); initAMRMClientAndTest(false); } @@ -1701,14 +1708,16 @@ public class TestAMRMClient { int expAsks, int expRelease) { RemoteRequestsTable<ContainerRequest> remoteRequestsTable = amClient.getTable(allocationReqId); + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); int containersRequestedNode = remoteRequestsTable.get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedRack = remoteRequestsTable.get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); assertEquals(expNode, containersRequestedNode); @@ -1906,4 +1915,106 @@ public class TestAMRMClient { } return result; } + + @Test(timeout = 60000) + public void testGetMatchingFitWithProfiles() throws Exception { + teardown(); + conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); + createClusterAndStartApplication(conf); + AMRMClient<ContainerRequest> amClient = null; + try { + // start am rm client + amClient = AMRMClient.<ContainerRequest>createAMRMClient(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + ProfileCapability capability1 = ProfileCapability.newInstance("minimum"); + ProfileCapability capability2 = ProfileCapability.newInstance("default"); + ProfileCapability capability3 = ProfileCapability.newInstance("maximum"); + ProfileCapability capability4 = ProfileCapability + .newInstance("minimum", Resource.newInstance(2048, 1)); + ProfileCapability capability5 = ProfileCapability.newInstance("default"); + ProfileCapability capability6 = ProfileCapability + .newInstance("default", Resource.newInstance(2048, 1)); + // http has the same capabilities as default + ProfileCapability capability7 = ProfileCapability.newInstance("http"); + + ContainerRequest storedContainer1 = + new ContainerRequest(capability1, nodes, racks, priority); + ContainerRequest storedContainer2 = + new ContainerRequest(capability2, nodes, racks, priority); + ContainerRequest storedContainer3 = + new ContainerRequest(capability3, nodes, racks, priority); + ContainerRequest storedContainer4 = + new ContainerRequest(capability4, nodes, racks, priority); + ContainerRequest storedContainer5 = + new ContainerRequest(capability5, nodes, racks, priority2); + ContainerRequest storedContainer6 = + new ContainerRequest(capability6, nodes, racks, priority); + ContainerRequest storedContainer7 = + new ContainerRequest(capability7, nodes, racks, priority); + + + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer2); + amClient.addContainerRequest(storedContainer3); + amClient.addContainerRequest(storedContainer4); + amClient.addContainerRequest(storedContainer5); + amClient.addContainerRequest(storedContainer6); + amClient.addContainerRequest(storedContainer7); + + // test matching of containers + List<? extends Collection<ContainerRequest>> matches; + ContainerRequest storedRequest; + // exact match + ProfileCapability testCapability1 = + ProfileCapability.newInstance("minimum"); + matches = amClient + .getMatchingRequests(priority, node, ExecutionType.GUARANTEED, + testCapability1); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertEquals(storedContainer1, storedRequest); + amClient.removeContainerRequest(storedContainer1); + + // exact matching with order maintained + // we should get back 3 matches - default + http because they have the + // same capability + ProfileCapability testCapability2 = + ProfileCapability.newInstance("default"); + matches = amClient + .getMatchingRequests(priority, node, ExecutionType.GUARANTEED, + testCapability2); + verifyMatches(matches, 2); + // must be returned in the order they were made + int i = 0; + for (ContainerRequest storedRequest1 : matches.get(0)) { + switch(i) { + case 0: + assertEquals(storedContainer2, storedRequest1); + break; + case 1: + assertEquals(storedContainer7, storedRequest1); + break; + } + i++; + } + amClient.removeContainerRequest(storedContainer5); + + // matching with larger container. all requests returned + Resource testCapability3 = Resource.newInstance(8192, 8); + matches = amClient + .getMatchingRequests(priority, node, testCapability3); + assertEquals(3, matches.size()); + + Resource testCapability4 = Resource.newInstance(2048, 1); + matches = amClient.getMatchingRequests(priority, node, testCapability4); + assertEquals(1, matches.size()); + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index ad18da3..53e70ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -29,6 +29,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -274,9 +275,10 @@ public class TestAMRMClientContainerRequest { AMRMClientImpl<ContainerRequest> client, ContainerRequest request, String location, boolean expectedRelaxLocality, ExecutionType executionType) { - ResourceRequest ask = client.getTable(0) - .get(request.getPriority(), location, executionType, - request.getCapability()).remoteRequest; + ProfileCapability profileCapability = ProfileCapability + .newInstance(request.getResourceProfile(), request.getCapability()); + ResourceRequest ask = client.getTable(0).get(request.getPriority(), + location, executionType, profileCapability).remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(1, ask.getNumContainers()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index e180f6d..00f5e03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; @@ -387,18 +388,21 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { RemoteRequestsTable<ContainerRequest> remoteRequestsTable = amClient.getTable(0); + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); + int containersRequestedNode = remoteRequestsTable.get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedRack = remoteRequestsTable.get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); int oppContainersRequestedAny = remoteRequestsTable.get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest .getNumContainers(); assertEquals(2, containersRequestedNode); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 9b79e2d..ddabd17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -255,9 +256,11 @@ public class TestNMClient { racks, priority)); } + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); int containersRequestedAny = rmClient.getTable(0) .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, - capability).remoteRequest.getNumContainers(); + profileCapability).remoteRequest.getNumContainers(); // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java index 305d18b..12c32fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; @@ -99,6 +100,7 @@ public class TestOpportunisticContainerAllocation { private static final long AM_EXPIRE_MS = 4000; private static Resource capability; + private static ProfileCapability profileCapability; private static Priority priority; private static Priority priority2; private static Priority priority3; @@ -151,6 +153,7 @@ public class TestOpportunisticContainerAllocation { priority3 = Priority.newInstance(3); priority4 = Priority.newInstance(4); capability = Resource.newInstance(512, 1); + profileCapability = ProfileCapability.newInstance(capability); node = nodeReports.get(0).getNodeId().getHost(); rack = nodeReports.get(0).getRackName(); @@ -273,7 +276,7 @@ public class TestOpportunisticContainerAllocation { int oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest .getNumContainers(); assertEquals(1, oppContainersRequestedAny); @@ -394,7 +397,7 @@ public class TestOpportunisticContainerAllocation { new AMRMClient.ContainerRequest(capability, null, null, priority3)); int guarContainersRequestedAny = amClient.getTable(0).get(priority3, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); assertEquals(1, guarContainersRequestedAny); @@ -512,6 +515,7 @@ public class TestOpportunisticContainerAllocation { assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); + amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.addContainerRequest( @@ -532,17 +536,17 @@ public class TestOpportunisticContainerAllocation { ExecutionType.OPPORTUNISTIC, true))); int containersRequestedNode = amClient.getTable(0).get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedRack = amClient.getTable(0).get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedAny = amClient.getTable(0).get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); int oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest .getNumContainers(); assertEquals(4, containersRequestedNode); @@ -564,17 +568,17 @@ public class TestOpportunisticContainerAllocation { ExecutionType.OPPORTUNISTIC, true))); containersRequestedNode = amClient.getTable(0).get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); containersRequestedRack = amClient.getTable(0).get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); containersRequestedAny = amClient.getTable(0).get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest .getNumContainers(); assertEquals(2, containersRequestedNode); @@ -691,10 +695,9 @@ public class TestOpportunisticContainerAllocation { ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); - int oppContainersRequestedAny = - amClient.getTable(0).get(priority3, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest - .getNumContainers(); + int oppContainersRequestedAny = amClient.getTable(0) + .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, + profileCapability).remoteRequest.getNumContainers(); assertEquals(2, oppContainersRequestedAny); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9e1ed84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json new file mode 100644 index 0000000..d0f3f72 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json @@ -0,0 +1,18 @@ +{ + "minimum": { + "memory-mb" : 1024, + "vcores" : 1 + }, + "default" : { + "memory-mb" : 2048, + "vcores" : 2 + }, + "maximum" : { + "memory-mb": 4096, + "vcores" : 4 + }, + "http" : { + "memory-mb" : 2048, + "vcores" : 2 + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org