Repository: hadoop Updated Branches: refs/heads/branch-2.6 bb6c79f76 -> 8dacf89b3
YARN-2501. Enhanced AMRMClient library to support requests against node labels. Contributed by Wangda Tan. (cherry picked from commit a5ec3d080978a67837946a991843a081ea712539) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8dacf89b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8dacf89b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8dacf89b Branch: refs/heads/branch-2.6 Commit: 8dacf89b3e8b0f47cd0ed90bffe0a2fed0afe51c Parents: bb6c79f Author: Vinod Kumar Vavilapalli <[email protected]> Authored: Fri Oct 10 19:57:39 2014 -0700 Committer: Vinod Kumar Vavilapalli <[email protected]> Committed: Fri Oct 10 19:59:30 2014 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../hadoop/yarn/client/api/AMRMClient.java | 36 ++++++++++++++++++-- .../yarn/client/api/impl/AMRMClientImpl.java | 18 ++++++---- .../yarn/client/api/impl/TestAMRMClient.java | 24 +++++++++++++ 4 files changed, 72 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8dacf89b/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2e3c286..73823bf 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -96,6 +96,9 @@ Release 2.6.0 - UNRELEASED YARN-2494. Added NodeLabels Manager internal API and implementation. (Wangda Tan via vinodkv) + YARN-2501. Enhanced AMRMClient library to support requests against node + labels. (Wangda Tan via vinodkv) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu http://git-wip-us.apache.org/repos/asf/hadoop/blob/8dacf89b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index f41c018..6f8c65a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -105,6 +105,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends final List<String> racks; final Priority priority; final boolean relaxLocality; + final String nodeLabelsExpression; /** * Instantiates a {@link ContainerRequest} with the given constraints and @@ -124,9 +125,9 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority) { - this(capability, nodes, racks, priority, true); + this(capability, nodes, racks, priority, true, null); } - + /** * Instantiates a {@link ContainerRequest} with the given constraints. * @@ -147,6 +148,32 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality) { + this(capability, nodes, racks, priority, relaxLocality, null); + } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + * @param nodeLabelsExpression + * Set node labels to allocate resource + */ + public ContainerRequest(Resource capability, String[] nodes, + String[] racks, Priority priority, boolean relaxLocality, + String nodeLabelsExpression) { // Validate request Preconditions.checkArgument(capability != null, "The Resource to be requested for each container " + @@ -163,6 +190,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); this.priority = priority; this.relaxLocality = relaxLocality; + this.nodeLabelsExpression = nodeLabelsExpression; } public Resource getCapability() { @@ -185,6 +213,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends return relaxLocality; } + public String getNodeLabelExpression() { + return nodeLabelsExpression; + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8dacf89b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 88b2f45..e5e32e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -251,7 +251,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { // RPC layer is using it to send info across askList.add(ResourceRequest.newInstance(r.getPriority(), r.getResourceName(), r.getCapability(), r.getNumContainers(), - r.getRelaxLocality())); + r.getRelaxLocality(), r.getNodeLabelExpression())); } releaseList = new ArrayList<ContainerId>(release); // optimistically clear this collection assuming no RPC failure @@ -436,25 +436,25 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } for (String node : dedupedNodes) { addResourceRequest(req.getPriority(), node, req.getCapability(), req, - true); + true, req.getNodeLabelExpression()); } } for (String rack : dedupedRacks) { addResourceRequest(req.getPriority(), rack, req.getCapability(), req, - true); + true, req.getNodeLabelExpression()); } // Ensure node requests are accompanied by requests for // corresponding rack for (String rack : inferredRacks) { addResourceRequest(req.getPriority(), rack, req.getCapability(), req, - req.getRelaxLocality()); + req.getRelaxLocality(), req.getNodeLabelExpression()); } // Off-switch addResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getCapability(), req, req.getRelaxLocality()); + req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression()); } @Override @@ -608,8 +608,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { ask.add(remoteRequest); } - private void addResourceRequest(Priority priority, String resourceName, - Resource capability, T req, boolean relaxLocality) { + private void + addResourceRequest(Priority priority, String resourceName, + Resource capability, T req, boolean relaxLocality, + String labelExpression) { Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -642,6 +644,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { if (relaxLocality) { resourceRequestInfo.containerRequests.add(req); } + + resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression); // Note this down for next interaction with ResourceManager addResourceRequestToAsk(resourceRequestInfo.remoteRequest); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8dacf89b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 4921452..35e6635 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import com.google.common.base.Supplier; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -147,6 +148,7 @@ public class TestAMRMClient { racks = new String[]{ rack }; } + @SuppressWarnings("deprecation") @Before public void startApp() throws Exception { // submit new app @@ -667,6 +669,28 @@ public class TestAMRMClient { } } } + + @Test(timeout=30000) + public void testAskWithNodeLabels() { + AMRMClientImpl<ContainerRequest> client = + new AMRMClientImpl<ContainerRequest>(); + + // add x, y to ANY + client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, + 1), null, null, Priority.UNDEFINED, true, "x && y")); + Assert.assertEquals(1, client.ask.size()); + Assert.assertEquals("x && y", client.ask.iterator().next() + .getNodeLabelExpression()); + + // add x, y and a, b to ANY, only a, b should be kept + client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, + 1), null, null, Priority.UNDEFINED, true, "x && y")); + client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, + 1), null, null, Priority.UNDEFINED, true, "a && b")); + Assert.assertEquals(1, client.ask.size()); + Assert.assertEquals("a && b", client.ask.iterator().next() + .getNodeLabelExpression()); + } private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient) throws YarnException, IOException {
