Author: tucu Date: Sun Jun 16 22:18:30 2013 New Revision: 1493601 URL: http://svn.apache.org/r1493601 Log: YARN-752. In AMRMClient, automatically add corresponding rack requests for requested nodes. (sandyr via tucu)
Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java - copied unchanged from r1493599, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1493601&r1=1493600&r2=1493601&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sun Jun 16 22:18:30 2013 @@ -352,6 +352,9 @@ Release 2.1.0-beta - UNRELEASED YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv) + YARN-752. In AMRMClient, automatically add corresponding rack requests for + requested nodes. (sandyr via tucu) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1493601&r1=1493600&r2=1493601&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Sun Jun 16 22:18:30 2013 @@ -42,23 +42,36 @@ import com.google.common.collect.Immutab public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service { /** - * Object to represent container request for resources. - * Resources may be localized to nodes and racks. - * Resources may be assigned priorities. - * All getters return unmodifiable collections. - * Can ask for multiple containers of a given type. + * Object to represent container request for resources. Scheduler + * documentation should be consulted for the specifics of how the parameters + * are honored. + * All getters return immutable values. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The racks + * corresponding to any hosts requested will be automatically added to + * this list. + * @param priority + * The priority at which to request the containers. Higher priorities have + * lower numerical values. + * @param containerCount + * The number of containers to request. */ public static class ContainerRequest { final Resource capability; - final ImmutableList<String> hosts; - final ImmutableList<String> racks; + final List<String> nodes; + final List<String> racks; final Priority priority; final int containerCount; - public ContainerRequest(Resource capability, String[] hosts, + public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, int containerCount) { this.capability = capability; - this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null); + this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); this.priority = priority; this.containerCount = containerCount; @@ -68,8 +81,8 @@ public interface AMRMClient<T extends AM return capability; } - public List<String> getHosts() { - return hosts; + public List<String> getNodes() { + return nodes; } public List<String> getRacks() { @@ -103,9 +116,9 @@ public interface AMRMClient<T extends AM * AMRMClient can remove it from its internal store. */ public static class StoredContainerRequest extends ContainerRequest { - public StoredContainerRequest(Resource capability, String[] hosts, + public StoredContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority) { - super(capability, hosts, racks, priority, 1); + super(capability, nodes, racks, priority, 1); } } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1493601&r1=1493600&r2=1493601&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Sun Jun 16 22:18:30 2013 @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.factories. import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.RackResolver; + +import com.google.common.base.Joiner; import com.google.common.annotations.VisibleForTesting; @@ -139,7 +143,7 @@ public class AMRMClientImpl<T extends Co //Key -> Priority //Value -> Map - //Key->ResourceName (e.g., hostname, rackname, *) + //Key->ResourceName (e.g., nodename, rackname, *) //Value->Map //Key->Resource Capability //Value->ResourceRequest @@ -160,6 +164,7 @@ public class AMRMClientImpl<T extends Co @Override protected void serviceInit(Configuration conf) throws Exception { + RackResolver.init(conf); super.serviceInit(conf); } @@ -309,22 +314,37 @@ public class AMRMClientImpl<T extends Co @Override public synchronized void addContainerRequest(T req) { - // Create resource requests - // add check for dup locations - if (req.hosts != null) { - for (String host : req.hosts) { - addResourceRequest(req.priority, host, req.capability, - req.containerCount, req); + Set<String> allRacks = new HashSet<String>(); + if (req.racks != null) { + allRacks.addAll(req.racks); + if(req.racks.size() != allRacks.size()) { + Joiner joiner = Joiner.on(','); + LOG.warn("ContainerRequest has duplicate racks: " + + joiner.join(req.racks)); } } - - if (req.racks != null) { - for (String rack : req.racks) { - addResourceRequest(req.priority, rack, req.capability, + allRacks.addAll(resolveRacks(req.nodes)); + + if (req.nodes != null) { + HashSet<String> dedupedNodes = new HashSet<String>(req.nodes); + if(dedupedNodes.size() != req.nodes.size()) { + Joiner joiner = Joiner.on(','); + LOG.warn("ContainerRequest has duplicate nodes: " + + joiner.join(req.nodes)); + } + for (String node : dedupedNodes) { + // Ensure node requests are accompanied by requests for + // corresponding rack + addResourceRequest(req.priority, node, req.capability, req.containerCount, req); } } + for (String rack : allRacks) { + addResourceRequest(req.priority, rack, req.capability, + req.containerCount, req); + } + // Off-switch addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.containerCount, req); @@ -332,19 +352,23 @@ public class AMRMClientImpl<T extends Co @Override public synchronized void removeContainerRequest(T req) { + Set<String> allRacks = new HashSet<String>(); + if (req.racks != null) { + allRacks.addAll(req.racks); + } + allRacks.addAll(resolveRacks(req.nodes)); + // Update resource requests - if (req.hosts != null) { - for (String hostName : req.hosts) { - decResourceRequest(req.priority, hostName, req.capability, + if (req.nodes != null) { + for (String node : new HashSet<String>(req.nodes)) { + decResourceRequest(req.priority, node, req.capability, req.containerCount, req); } } - if (req.racks != null) { - for (String rack : req.racks) { - decResourceRequest(req.priority, rack, req.capability, - req.containerCount, req); - } + for (String rack : allRacks) { + decResourceRequest(req.priority, rack, req.capability, + req.containerCount, req); } decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, @@ -404,6 +428,24 @@ public class AMRMClientImpl<T extends Co return list; } + private Set<String> resolveRacks(List<String> nodes) { + Set<String> racks = new HashSet<String>(); + if (nodes != null) { + for (String node : nodes) { + // Ensure node requests are accompanied by requests for + // corresponding rack + String rack = RackResolver.resolve(node).getNetworkLocation(); + if (rack == null) { + LOG.warn("Failed to resolve rack for node " + node + "."); + } else { + racks.add(rack); + } + } + } + + return racks; + } + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // This code looks weird but is needed because of the following scenario. // A ResourceRequest is removed from the remoteRequestTable. A 0 container Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1493601&r1=1493600&r2=1493601&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Sun Jun 16 22:18:30 2013 @@ -267,6 +267,51 @@ public class TestAMRMClient { assertTrue(matches.size() == 1); assertTrue(matches.get(0).size() == matchSize); } + + @Test (timeout=60000) + public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException { + AMRMClientImpl<StoredContainerRequest> amClient = null; + try { + // start am rm client + amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + Resource capability = Resource.newInstance(1024, 2); + + StoredContainerRequest storedContainer1 = + new StoredContainerRequest(capability, nodes, null, priority); + amClient.addContainerRequest(storedContainer1); + + // verify matching with original node and inferred rack + List<? extends Collection<StoredContainerRequest>> matches; + StoredContainerRequest storedRequest; + // exact match node + matches = amClient.getMatchingRequests(priority, node, capability); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertTrue(storedContainer1 == storedRequest); + // inferred match rack + matches = amClient.getMatchingRequests(priority, rack, capability); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertTrue(storedContainer1 == storedRequest); + + // inferred rack match no longer valid after request is removed + amClient.removeContainerRequest(storedContainer1); + matches = amClient.getMatchingRequests(priority, rack, capability); + assertTrue(matches.isEmpty()); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } @Test (timeout=60000) public void testAMRMClientMatchStorage() throws YarnException, IOException {