Author: tucu
Date: Sun Jun 16 22:11:38 2013
New Revision: 1493599

URL: http://svn.apache.org/r1493599
Log:
YARN-752. In AMRMClient, automatically add corresponding rack requests for 
requested nodes. (sandyr via tucu)

Added:
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1493599&r1=1493598&r2=1493599&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sun Jun 16 22:11:38 2013
@@ -372,6 +372,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-693. Modified RM to send NMTokens on allocate call so that AMs can 
then
     use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
 
+    YARN-752. In AMRMClient, automatically add corresponding rack requests for 
+    requested nodes. (sandyr via tucu)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1493599&r1=1493598&r2=1493599&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
 Sun Jun 16 22:11:38 2013
@@ -42,23 +42,36 @@ import com.google.common.collect.Immutab
 public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends 
Service {
 
   /**
-   * Object to represent container request for resources.
-   * Resources may be localized to nodes and racks.
-   * Resources may be assigned priorities.
-   * All getters return unmodifiable collections.
-   * Can ask for multiple containers of a given type.
+   * Object to represent container request for resources. Scheduler
+   * documentation should be consulted for the specifics of how the parameters
+   * are honored.
+   * All getters return immutable values.
+   * 
+   * @param capability
+   *    The {@link Resource} to be requested for each container.
+   * @param nodes
+   *    Any hosts to request that the containers are placed on.
+   * @param racks
+   *    Any racks to request that the containers are placed on. The racks
+   *    corresponding to any hosts requested will be automatically added to
+   *    this list.
+   * @param priority
+   *    The priority at which to request the containers. Higher priorities have
+   *    lower numerical values.
+   * @param containerCount
+   *    The number of containers to request.
    */
   public static class ContainerRequest {
     final Resource capability;
-    final ImmutableList<String> hosts;
-    final ImmutableList<String> racks;
+    final List<String> nodes;
+    final List<String> racks;
     final Priority priority;
     final int containerCount;
         
-    public ContainerRequest(Resource capability, String[] hosts,
+    public ContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority, int containerCount) {
       this.capability = capability;
-      this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
+      this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
       this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
       this.priority = priority;
       this.containerCount = containerCount;
@@ -68,8 +81,8 @@ public interface AMRMClient<T extends AM
       return capability;
     }
     
-    public List<String> getHosts() {
-      return hosts;
+    public List<String> getNodes() {
+      return nodes;
     }
     
     public List<String> getRacks() {
@@ -103,9 +116,9 @@ public interface AMRMClient<T extends AM
    * AMRMClient can remove it from its internal store.
    */
   public static class StoredContainerRequest extends ContainerRequest {    
-    public StoredContainerRequest(Resource capability, String[] hosts,
+    public StoredContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority) {
-      super(capability, hosts, racks, priority, 1);
+      super(capability, nodes, racks, priority, 1);
     }
   }
   

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1493599&r1=1493598&r2=1493599&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
 Sun Jun 16 22:11:38 2013
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+
+import com.google.common.base.Joiner;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -139,7 +143,7 @@ public class AMRMClientImpl<T extends Co
   
   //Key -> Priority
   //Value -> Map
-  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Key->ResourceName (e.g., nodename, rackname, *)
   //Value->Map
   //Key->Resource Capability
   //Value->ResourceRequest
@@ -160,6 +164,7 @@ public class AMRMClientImpl<T extends Co
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    RackResolver.init(conf);
     super.serviceInit(conf);
   }
 
@@ -309,22 +314,37 @@ public class AMRMClientImpl<T extends Co
   
   @Override
   public synchronized void addContainerRequest(T req) {
-    // Create resource requests
-    // add check for dup locations
-    if (req.hosts != null) {
-      for (String host : req.hosts) {
-        addResourceRequest(req.priority, host, req.capability,
-            req.containerCount, req);
+    Set<String> allRacks = new HashSet<String>();
+    if (req.racks != null) {
+      allRacks.addAll(req.racks);
+      if(req.racks.size() != allRacks.size()) {
+        Joiner joiner = Joiner.on(',');
+        LOG.warn("ContainerRequest has duplicate racks: "
+            + joiner.join(req.racks));
       }
     }
-
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        addResourceRequest(req.priority, rack, req.capability,
+    allRacks.addAll(resolveRacks(req.nodes));
+    
+    if (req.nodes != null) {
+      HashSet<String> dedupedNodes = new HashSet<String>(req.nodes);
+      if(dedupedNodes.size() != req.nodes.size()) {
+        Joiner joiner = Joiner.on(',');
+        LOG.warn("ContainerRequest has duplicate nodes: "
+            + joiner.join(req.nodes));        
+      }
+      for (String node : dedupedNodes) {
+        // Ensure node requests are accompanied by requests for
+        // corresponding rack
+        addResourceRequest(req.priority, node, req.capability,
             req.containerCount, req);
       }
     }
 
+    for (String rack : allRacks) {
+      addResourceRequest(req.priority, rack, req.capability,
+          req.containerCount, req);
+    }
+
     // Off-switch
     addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
         req.containerCount, req);
@@ -332,19 +352,23 @@ public class AMRMClientImpl<T extends Co
 
   @Override
   public synchronized void removeContainerRequest(T req) {
+    Set<String> allRacks = new HashSet<String>();
+    if (req.racks != null) {
+      allRacks.addAll(req.racks);
+    }
+    allRacks.addAll(resolveRacks(req.nodes));
+
     // Update resource requests
-    if (req.hosts != null) {
-      for (String hostName : req.hosts) {
-        decResourceRequest(req.priority, hostName, req.capability,
+    if (req.nodes != null) {
+      for (String node : new HashSet<String>(req.nodes)) {
+        decResourceRequest(req.priority, node, req.capability,
             req.containerCount, req);
       }
     }
 
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        decResourceRequest(req.priority, rack, req.capability,
-            req.containerCount, req);
-      }
+    for (String rack : allRacks) {
+      decResourceRequest(req.priority, rack, req.capability,
+          req.containerCount, req);
     }
 
     decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
@@ -404,6 +428,24 @@ public class AMRMClientImpl<T extends Co
     return list;          
   }
   
+  private Set<String> resolveRacks(List<String> nodes) {
+    Set<String> racks = new HashSet<String>();    
+    if (nodes != null) {
+      for (String node : nodes) {
+        // Ensure node requests are accompanied by requests for
+        // corresponding rack
+        String rack = RackResolver.resolve(node).getNetworkLocation();
+        if (rack == null) {
+          LOG.warn("Failed to resolve rack for node " + node + ".");
+        } else {
+          racks.add(rack);
+        }
+      }
+    }
+    
+    return racks;
+  }
+  
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
     // This code looks weird but is needed because of the following scenario.
     // A ResourceRequest is removed from the remoteRequestTable. A 0 container 

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1493599&r1=1493598&r2=1493599&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
 Sun Jun 16 22:11:38 2013
@@ -267,6 +267,51 @@ public class TestAMRMClient {
     assertTrue(matches.size() == 1);
     assertTrue(matches.get(0).size() == matchSize);    
   }
+  
+  @Test (timeout=60000)
+  public void testAMRMClientMatchingFitInferredRack() throws YarnException, 
IOException {
+    AMRMClientImpl<StoredContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      Resource capability = Resource.newInstance(1024, 2);
+
+      StoredContainerRequest storedContainer1 = 
+          new StoredContainerRequest(capability, nodes, null, priority);
+      amClient.addContainerRequest(storedContainer1);
+
+      // verify matching with original node and inferred rack
+      List<? extends Collection<StoredContainerRequest>> matches;
+      StoredContainerRequest storedRequest;
+      // exact match node
+      matches = amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      // inferred match rack
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      
+      // inferred rack match no longer valid after request is removed
+      amClient.removeContainerRequest(storedContainer1);
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      assertTrue(matches.isEmpty());
+      
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
 
   @Test (timeout=60000)
   public void testAMRMClientMatchStorage() throws YarnException, IOException {

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java?rev=1493599&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
 Sun Jun 16 22:11:38 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest;
+import static org.junit.Assert.assertEquals;
+
+public class TestAMRMClientContainerRequest {
+  @Test
+  public void testFillInRacks() {
+    AMRMClientImpl<ContainerRequest> client = new 
AMRMClientImpl<ContainerRequest>(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+    
+    Configuration conf = new Configuration();
+    conf.setClass(
+        
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+ 
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest request =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            new String[] {"/rack2"}, Priority.newInstance(1), 4);
+    client.addContainerRequest(request);
+    verifyResourceRequestLocation(client, request, "host1");
+    verifyResourceRequestLocation(client, request, "host2");
+    verifyResourceRequestLocation(client, request, "/rack1");
+    verifyResourceRequestLocation(client, request, "/rack2");
+    verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
+  }
+  
+  private static class MyResolver implements DNSToSwitchMapping {
+
+    @Override
+    public List<String> resolve(List<String> names) {
+      return Arrays.asList("/rack1");
+    }
+
+    @Override
+    public void reloadCachedMappings() {}
+  }
+  
+  private void verifyResourceRequestLocation(
+      AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
+      String location) {
+    ResourceRequest ask =  client.remoteRequestsTable.get(request.priority)
+        .get(location).get(request.capability).remoteRequest;
+    assertEquals(location, ask.getResourceName());
+    assertEquals(request.getContainerCount(), ask.getNumContainers());
+  }
+}


Reply via email to