MAPREDUCE-6871. Allow users to specify racks and nodes for strict locality for 
AMs (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3721cfe1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3721cfe1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3721cfe1

Branch: refs/heads/YARN-2915
Commit: 3721cfe1fbd98c5b6aa46aefdfcf62276c28c4a4
Parents: 5078df7
Author: Robert Kanter <rkan...@apache.org>
Authored: Fri Apr 21 16:12:01 2017 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Fri Apr 21 16:12:01 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   6 +
 .../org/apache/hadoop/mapred/YARNRunner.java    | 132 ++++++++++++---
 .../apache/hadoop/mapred/TestYARNRunner.java    | 167 +++++++++++++++++++
 3 files changed, 279 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3721cfe1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 18bf139..cfc1bcc 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -91,6 +91,12 @@ public interface MRJobConfig {
    */
   public static final String REDUCE_NODE_LABEL_EXP = 
"mapreduce.reduce.node-label-expression";
 
+  /**
+   * Specify strict locality on a comma-separated list of racks and/or nodes.
+   * Syntax: /rack or /rack/node or node (assumes /default-rack)
+   */
+  public static final String AM_STRICT_LOCALITY = 
"mapreduce.job.am.strict-locality";
+
   public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
 
   public static final String JOB_TAGS = "mapreduce.job.tags";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3721cfe1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 2339c79..1baa467 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -22,13 +22,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -101,6 +102,25 @@ public class YARNRunner implements ClientProtocol {
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
 
+  private static final String RACK_GROUP = "rack";
+  private static final String NODE_IF_RACK_GROUP = "node1";
+  private static final String NODE_IF_NO_RACK_GROUP = "node2";
+
+  /**
+   * Matches any of the following patterns with capturing groups:
+   * <ul>
+   *  <li>/rack</li>
+   *  <li>/rack/node</li>
+   *  <li>node (assumes /default-rack)</li>
+   * </ul>
+   * The groups can be retrieved using the RACK_GROUP, NODE_IF_RACK_GROUP,
+   * and/or NODE_IF_NO_RACK_GROUP group keys.
+   */
+  private static final Pattern RACK_NODE_PATTERN =
+      Pattern.compile(
+          String.format("(?<%s>[^/]+?)|(?<%s>/[^/]+?)(?:/(?<%s>[^/]+?))?",
+          NODE_IF_NO_RACK_GROUP, RACK_GROUP, NODE_IF_RACK_GROUP));
+
   private final static RecordFactory recordFactory = RecordFactoryProvider
       .getRecordFactory(null);
 
@@ -503,20 +523,6 @@ public class YARNRunner implements ClientProtocol {
       throws IOException {
     ApplicationId applicationId = resMgrDelegate.getApplicationId();
 
-    // Setup resource requirements
-    Resource capability = recordFactory.newRecordInstance(Resource.class);
-    capability.setMemorySize(
-        conf.getInt(
-            MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
-        )
-    );
-    capability.setVirtualCores(
-        conf.getInt(
-            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
-        )
-    );
-    LOG.debug("AppMaster capability = " + capability);
-
     // Setup LocalResources
     Map<String, LocalResource> localResources =
         setupLocalResources(jobConf, jobSubmitDir);
@@ -577,21 +583,18 @@ public class YARNRunner implements ClientProtocol {
     appContext.setMaxAppAttempts(
         conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
             MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
-    appContext.setResource(capability);
 
-    // set labels for the AM container request if present
+    // Setup the AM ResourceRequests
+    List<ResourceRequest> amResourceRequests = generateResourceRequests();
+    appContext.setAMContainerResourceRequests(amResourceRequests);
+
+    // set labels for the AM container requests if present
     String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
     if (null != amNodelabelExpression
         && amNodelabelExpression.trim().length() != 0) {
-      ResourceRequest amResourceRequest =
-          recordFactory.newRecordInstance(ResourceRequest.class);
-      amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
-      amResourceRequest.setResourceName(ResourceRequest.ANY);
-      amResourceRequest.setCapability(capability);
-      amResourceRequest.setNumContainers(1);
-      amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
-      appContext.setAMContainerResourceRequests(
-          Collections.singletonList(amResourceRequest));
+      for (ResourceRequest amResourceRequest : amResourceRequests) {
+        amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
+      }
     }
     // set labels for the Job containers
     appContext.setNodeLabelExpression(jobConf
@@ -616,6 +619,83 @@ public class YARNRunner implements ClientProtocol {
     return appContext;
   }
 
+  private List<ResourceRequest> generateResourceRequests() throws IOException {
+    Resource capability = recordFactory.newRecordInstance(Resource.class);
+    capability.setMemorySize(
+        conf.getInt(
+            MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+        )
+    );
+    capability.setVirtualCores(
+        conf.getInt(
+            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+        )
+    );
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("AppMaster capability = " + capability);
+    }
+
+    List<ResourceRequest> amResourceRequests = new ArrayList<>();
+    // Always have an ANY request
+    ResourceRequest amAnyResourceRequest =
+        createAMResourceRequest(ResourceRequest.ANY, capability);
+    Map<String, ResourceRequest> rackRequests = new HashMap<>();
+    amResourceRequests.add(amAnyResourceRequest);
+    Collection<String> amStrictResources = conf.getStringCollection(
+        MRJobConfig.AM_STRICT_LOCALITY);
+    for (String amStrictResource : amStrictResources) {
+      amAnyResourceRequest.setRelaxLocality(false);
+      Matcher matcher = RACK_NODE_PATTERN.matcher(amStrictResource);
+      if (matcher.matches()) {
+        String nodeName;
+        String rackName = matcher.group(RACK_GROUP);
+        if (rackName == null) {
+          rackName = "/default-rack";
+          nodeName = matcher.group(NODE_IF_NO_RACK_GROUP);
+        } else {
+          nodeName = matcher.group(NODE_IF_RACK_GROUP);
+        }
+        ResourceRequest amRackResourceRequest = rackRequests.get(rackName);
+        if (amRackResourceRequest == null) {
+          amRackResourceRequest = createAMResourceRequest(rackName, 
capability);
+          amResourceRequests.add(amRackResourceRequest);
+          rackRequests.put(rackName, amRackResourceRequest);
+        }
+        if (nodeName != null) {
+          amRackResourceRequest.setRelaxLocality(false);
+          ResourceRequest amNodeResourceRequest =
+              createAMResourceRequest(nodeName, capability);
+          amResourceRequests.add(amNodeResourceRequest);
+        }
+      } else {
+        String errMsg =
+            "Invalid resource name: " + amStrictResource + " specified.";
+        LOG.warn(errMsg);
+        throw new IOException(errMsg);
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      for (ResourceRequest amResourceRequest : amResourceRequests) {
+        LOG.debug("ResourceRequest: resource = "
+            + amResourceRequest.getResourceName() + ", locality = "
+            + amResourceRequest.getRelaxLocality());
+      }
+    }
+    return amResourceRequests;
+  }
+
+  private ResourceRequest createAMResourceRequest(String resource,
+      Resource capability) {
+    ResourceRequest resourceRequest =
+        recordFactory.newRecordInstance(ResourceRequest.class);
+    resourceRequest.setPriority(AM_CONTAINER_PRIORITY);
+    resourceRequest.setResourceName(resource);
+    resourceRequest.setCapability(capability);
+    resourceRequest.setNumContainers(1);
+    resourceRequest.setRelaxLocality(true);
+    return resourceRequest;
+  }
+
   private void setTokenRenewerConf(ContainerLaunchContext context,
       Configuration conf, String regex) throws IOException {
     DataOutputBuffer dob = new DataOutputBuffer();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3721cfe1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index c2bda62..bd3e524 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -40,6 +41,7 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -93,6 +95,8 @@ import 
org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
@@ -576,6 +580,169 @@ public class TestYARNRunner {
   }
 
   @Test
+  public void testResourceRequestLocalityAny() throws Exception {
+    ResourceRequest amAnyResourceRequest =
+        createResourceRequest(ResourceRequest.ANY, true);
+    verifyResourceRequestLocality(null, null, amAnyResourceRequest);
+    verifyResourceRequestLocality(null, "label1", amAnyResourceRequest);
+  }
+
+  @Test
+  public void testResourceRequestLocalityRack() throws Exception {
+    ResourceRequest amAnyResourceRequest =
+        createResourceRequest(ResourceRequest.ANY, false);
+    ResourceRequest amRackResourceRequest =
+        createResourceRequest("/rack1", true);
+    verifyResourceRequestLocality("/rack1", null, amAnyResourceRequest,
+        amRackResourceRequest);
+    verifyResourceRequestLocality("/rack1", "label1", amAnyResourceRequest,
+        amRackResourceRequest);
+  }
+
+  @Test
+  public void testResourceRequestLocalityNode() throws Exception {
+    ResourceRequest amAnyResourceRequest =
+        createResourceRequest(ResourceRequest.ANY, false);
+    ResourceRequest amRackResourceRequest =
+        createResourceRequest("/rack1", false);
+    ResourceRequest amNodeResourceRequest =
+        createResourceRequest("node1", true);
+    verifyResourceRequestLocality("/rack1/node1", null, amAnyResourceRequest,
+        amRackResourceRequest, amNodeResourceRequest);
+    verifyResourceRequestLocality("/rack1/node1", "label1",
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
+  }
+
+  @Test
+  public void testResourceRequestLocalityNodeDefaultRack() throws Exception {
+    ResourceRequest amAnyResourceRequest =
+        createResourceRequest(ResourceRequest.ANY, false);
+    ResourceRequest amRackResourceRequest =
+        createResourceRequest("/default-rack", false);
+    ResourceRequest amNodeResourceRequest =
+        createResourceRequest("node1", true);
+    verifyResourceRequestLocality("node1", null,
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
+    verifyResourceRequestLocality("node1", "label1",
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
+  }
+
+  @Test
+  public void testResourceRequestLocalityMultipleNodes() throws Exception {
+    ResourceRequest amAnyResourceRequest =
+        createResourceRequest(ResourceRequest.ANY, false);
+    ResourceRequest amRackResourceRequest =
+        createResourceRequest("/rack1", false);
+    ResourceRequest amNodeResourceRequest =
+        createResourceRequest("node1", true);
+    ResourceRequest amNode2ResourceRequest =
+        createResourceRequest("node2", true);
+    verifyResourceRequestLocality("/rack1/node1,/rack1/node2", null,
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+        amNode2ResourceRequest);
+    verifyResourceRequestLocality("/rack1/node1,/rack1/node2", "label1",
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+        amNode2ResourceRequest);
+  }
+
+  @Test
+  public void testResourceRequestLocalityMultipleNodesDifferentRack()
+      throws Exception {
+    ResourceRequest amAnyResourceRequest =
+        createResourceRequest(ResourceRequest.ANY, false);
+    ResourceRequest amRackResourceRequest =
+        createResourceRequest("/rack1", false);
+    ResourceRequest amNodeResourceRequest =
+        createResourceRequest("node1", true);
+    ResourceRequest amRack2ResourceRequest =
+        createResourceRequest("/rack2", false);
+    ResourceRequest amNode2ResourceRequest =
+        createResourceRequest("node2", true);
+    verifyResourceRequestLocality("/rack1/node1,/rack2/node2", null,
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+        amRack2ResourceRequest, amNode2ResourceRequest);
+    verifyResourceRequestLocality("/rack1/node1,/rack2/node2", "label1",
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+        amRack2ResourceRequest, amNode2ResourceRequest);
+  }
+
+  @Test
+  public void testResourceRequestLocalityMultipleNodesDefaultRack()
+      throws Exception {
+    ResourceRequest amAnyResourceRequest =
+        createResourceRequest(ResourceRequest.ANY, false);
+    ResourceRequest amRackResourceRequest =
+        createResourceRequest("/rack1", false);
+    ResourceRequest amNodeResourceRequest =
+        createResourceRequest("node1", true);
+    ResourceRequest amRack2ResourceRequest =
+        createResourceRequest("/default-rack", false);
+    ResourceRequest amNode2ResourceRequest =
+        createResourceRequest("node2", true);
+    verifyResourceRequestLocality("/rack1/node1,node2", null,
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+        amRack2ResourceRequest, amNode2ResourceRequest);
+    verifyResourceRequestLocality("/rack1/node1,node2", "label1",
+        amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+        amRack2ResourceRequest, amNode2ResourceRequest);
+  }
+
+  @Test
+  public void testResourceRequestLocalityInvalid() throws Exception {
+    try {
+      verifyResourceRequestLocality("rack/node1", null,
+          new ResourceRequest[]{});
+      fail("Should have failed due to invalid resource but did not");
+    } catch (IOException ioe) {
+      assertTrue(ioe.getMessage().contains("Invalid resource name"));
+    }
+    try {
+      verifyResourceRequestLocality("/rack/node1/blah", null,
+          new ResourceRequest[]{});
+      fail("Should have failed due to invalid resource but did not");
+    } catch (IOException ioe) {
+      assertTrue(ioe.getMessage().contains("Invalid resource name"));
+    }
+  }
+
+  private void verifyResourceRequestLocality(String strictResource,
+      String label, ResourceRequest... expectedReqs) throws Exception {
+    JobConf jobConf = new JobConf();
+    if (strictResource != null) {
+      jobConf.set(MRJobConfig.AM_STRICT_LOCALITY, strictResource);
+    }
+    if (label != null) {
+      jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, label);
+      for (ResourceRequest expectedReq : expectedReqs) {
+        expectedReq.setNodeLabelExpression(label);
+      }
+    }
+
+    YARNRunner yarnRunner = new YARNRunner(jobConf);
+    ApplicationSubmissionContext appSubCtx =
+        buildSubmitContext(yarnRunner, jobConf);
+    assertEquals(Arrays.asList(expectedReqs),
+        appSubCtx.getAMContainerResourceRequests());
+  }
+
+  private ResourceRequest createResourceRequest(String name,
+      boolean relaxLocality) {
+    Resource capability = recordFactory.newRecordInstance(Resource.class);
+    capability.setMemorySize(MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
+    capability.setVirtualCores(MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
+
+    ResourceRequest req =
+        recordFactory.newRecordInstance(ResourceRequest.class);
+    req.setPriority(YARNRunner.AM_CONTAINER_PRIORITY);
+    req.setResourceName(name);
+    req.setCapability(capability);
+    req.setNumContainers(1);
+    req.setRelaxLocality(relaxLocality);
+
+    return req;
+  }
+
+  @Test
   public void testAMStandardEnvWithDefaultLibPath() throws Exception {
     testAMStandardEnv(false);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to