MAPREDUCE-6304. Specifying node labels when submitting MR jobs. Contributed by 
Naganarasimha G R. Backport by Inigo Goiri.

(cherry picked from commit 3164e7d83875aa6b7435d1dfe61ac280aa277f1c)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42c2d36b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42c2d36b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42c2d36b

Branch: refs/heads/branch-2.7
Commit: 42c2d36ba620bfd560e5000ec06f9cac08d602d3
Parents: 2c3f6ae
Author: Konstantin V Shvachko <s...@apache.org>
Authored: Thu May 18 00:20:15 2017 -0700
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Thu May 18 00:20:15 2017 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../v2/app/rm/RMContainerAllocator.java         | 19 ++++-
 .../v2/app/rm/RMContainerRequestor.java         | 30 ++++---
 .../v2/app/rm/TestRMContainerAllocator.java     | 89 +++++++++++++++++++-
 .../apache/hadoop/mapreduce/MRJobConfig.java    | 20 +++++
 .../src/main/resources/mapred-default.xml       | 35 ++++++++
 .../org/apache/hadoop/mapred/YARNRunner.java    | 30 ++++++-
 .../apache/hadoop/mapred/TestYARNRunner.java    | 16 ++++
 8 files changed, 225 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 4d8753e..bb2a73f 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -6,6 +6,9 @@ Release 2.7.4 - UNRELEASED
 
   NEW FEATURES
 
+    MAPREDUCE-6304. Specifying node labels when submitting MR jobs.
+    (Naganarasimha G R. Backport by Inigo Goiri)
+
   IMPROVEMENTS
 
     MAPREDUCE-6741. Add MR support to redact job conf properties. (Haibo Chen

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 11ca5fa..0df58b7 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -178,6 +178,10 @@ public class RMContainerAllocator extends 
RMContainerRequestor
 
   private ScheduleStats scheduleStats = new ScheduleStats();
 
+  private String mapNodeLabelExpression;
+
+  private String reduceNodeLabelExpression;
+
   public RMContainerAllocator(ClientService clientService, AppContext context) 
{
     super(clientService, context);
     this.stopped = new AtomicBoolean(false);
@@ -214,6 +218,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     RackResolver.init(conf);
     retryInterval = 
getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
                                 
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP);
+    reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP);
     // Init startTime to current time. If all goes well, it will be reset after
     // first attempt to contact RM.
     retrystartTime = System.currentTimeMillis();
@@ -404,9 +410,11 @@ public class RMContainerAllocator extends 
RMContainerRequestor
           reduceResourceRequest.getVirtualCores());
         if (reqEvent.getEarlierAttemptFailed()) {
           //add to the front of queue for fail fast
-          pendingReduces.addFirst(new ContainerRequest(reqEvent, 
PRIORITY_REDUCE));
+          pendingReduces.addFirst(new ContainerRequest(reqEvent,
+              PRIORITY_REDUCE, reduceNodeLabelExpression));
         } else {
-          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
+          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE,
+              reduceNodeLabelExpression));
           //reduces are added to pending and are slowly ramped up
         }
       }
@@ -978,7 +986,9 @@ public class RMContainerAllocator extends 
RMContainerRequestor
       
       if (event.getEarlierAttemptFailed()) {
         earlierFailedMaps.add(event.getAttemptID());
-        request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
+        request =
+            new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP,
+                mapNodeLabelExpression);
         LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
       } else {
         for (String host : event.getHosts()) {
@@ -1003,7 +1013,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
             LOG.debug("Added attempt req to rack " + rack);
          }
        }
-       request = new ContainerRequest(event, PRIORITY_MAP);
+       request =
+           new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
       }
       maps.put(event.getAttemptID(), request);
       addContainerReq(request);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index b466668..d89a82d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -122,39 +122,43 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
     final String[] racks;
     //final boolean earlierAttemptFailed;
     final Priority priority;
+    final String nodeLabelExpression;
+
     /**
      * the time when this request object was formed; can be used to avoid
      * aggressive preemption for recently placed requests
      */
     final long requestTimeMs;
 
-    public ContainerRequest(ContainerRequestEvent event, Priority priority) {
+    public ContainerRequest(ContainerRequestEvent event, Priority priority,
+        String nodeLabelExpression) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
-          event.getRacks(), priority);
+          event.getRacks(), priority, nodeLabelExpression);
     }
 
     public ContainerRequest(ContainerRequestEvent event, Priority priority,
                             long requestTimeMs) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
-          event.getRacks(), priority, requestTimeMs);
+          event.getRacks(), priority, requestTimeMs, null);
     }
 
     public ContainerRequest(TaskAttemptId attemptID,
                             Resource capability, String[] hosts, String[] 
racks,
-                            Priority priority) {
+                            Priority priority, String nodeLabelExpression) {
       this(attemptID, capability, hosts, racks, priority,
-          System.currentTimeMillis());
+          System.currentTimeMillis(), nodeLabelExpression);
     }
 
     public ContainerRequest(TaskAttemptId attemptID,
         Resource capability, String[] hosts, String[] racks,
-        Priority priority, long requestTimeMs) {
+        Priority priority, long requestTimeMs, String nodeLabelExpression) {
       this.attemptID = attemptID;
       this.capability = capability;
       this.hosts = hosts;
       this.racks = racks;
       this.priority = priority;
       this.requestTimeMs = requestTimeMs;
+      this.nodeLabelExpression = nodeLabelExpression;
     }
     
     public String toString() {
@@ -391,17 +395,20 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
     for (String host : req.hosts) {
       // Data-local
       if (!isNodeBlacklisted(host)) {
-        addResourceRequest(req.priority, host, req.capability);
+        addResourceRequest(req.priority, host, req.capability,
+            null);
       }      
     }
 
     // Nothing Rack-local for now
     for (String rack : req.racks) {
-      addResourceRequest(req.priority, rack, req.capability);
+      addResourceRequest(req.priority, rack, req.capability,
+          null);
     }
 
     // Off-switch
-    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
+    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
+        req.nodeLabelExpression);
   }
 
   protected void decContainerReq(ContainerRequest req) {
@@ -418,7 +425,7 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
   }
 
   private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability) {
+      Resource capability, String nodeLabelExpression) {
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
@@ -440,6 +447,7 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
       remoteRequest.setResourceName(resourceName);
       remoteRequest.setCapability(capability);
       remoteRequest.setNumContainers(0);
+      remoteRequest.setNodeLabelExpression(nodeLabelExpression);
       reqMap.put(capability, remoteRequest);
     }
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
@@ -534,7 +542,7 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
     }
     String[] hosts = newHosts.toArray(new String[newHosts.size()]);
     ContainerRequest newReq = new ContainerRequest(orig.attemptID, 
orig.capability,
-        hosts, orig.racks, orig.priority); 
+        hosts, orig.racks, orig.priority, orig.nodeLabelExpression); 
     return newReq;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 24b8766..da1fbfb 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -495,7 +496,7 @@ public class TestRMContainerAllocator {
     ContainerRequestEvent event1 =
         createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
     scheduledRequests.maps.put(mock(TaskAttemptId.class),
-        new RMContainerRequestor.ContainerRequest(event1, null));
+        new RMContainerRequestor.ContainerRequest(event1, null, null));
     assignedRequests.reduces.put(mock(TaskAttemptId.class),
         mock(Container.class));
 
@@ -629,6 +630,91 @@ public class TestRMContainerAllocator {
   }
 
   @Test
+  public void testMapReduceAllocationWithNodeLabelExpression() throws 
Exception {
+
+    LOG.info("Running testMapReduceAllocationWithNodeLabelExpression");
+    Configuration conf = new Configuration();
+    /*
+     * final int MAP_LIMIT = 3; final int REDUCE_LIMIT = 1;
+     * conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
+     * conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
+     */
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
+    conf.set(MRJobConfig.MAP_NODE_LABEL_EXP, "MapNodes");
+    conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes");
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+    MyContainerAllocator allocator =
+        new MyContainerAllocator(null, conf, appAttemptId, mockJob) {
+          @Override
+          protected void register() {
+          }
+
+          @Override
+          protected ApplicationMasterProtocol createSchedulerProxy() {
+            return mockScheduler;
+          }
+        };
+
+    // create some map requests
+    ContainerRequestEvent reqMapEvents;
+    reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" });
+    allocator.sendRequests(Arrays.asList(reqMapEvents));
+
+    // create some reduce requests
+    ContainerRequestEvent reqReduceEvents;
+    reqReduceEvents =
+        createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true);
+    allocator.sendRequests(Arrays.asList(reqReduceEvents));
+    allocator.schedule();
+    // verify all of the host-specific asks were sent plus one for the
+    // default rack and one for the ANY request
+    Assert.assertEquals(3, mockScheduler.lastAsk.size());
+    // verify ResourceRequest sent for MAP have appropriate node
+    // label expression as per the configuration
+    validateLabelsRequests(mockScheduler.lastAsk.get(0), false);
+    validateLabelsRequests(mockScheduler.lastAsk.get(1), false);
+    validateLabelsRequests(mockScheduler.lastAsk.get(2), false);
+
+    // assign a map task and verify we do not ask for any more maps
+    ContainerId cid0 = mockScheduler.assignContainer("map", false);
+    allocator.schedule();
+    // default rack and one for the ANY request
+    Assert.assertEquals(3, mockScheduler.lastAsk.size());
+    validateLabelsRequests(mockScheduler.lastAsk.get(0), true);
+    validateLabelsRequests(mockScheduler.lastAsk.get(1), true);
+    validateLabelsRequests(mockScheduler.lastAsk.get(2), true);
+
+    // complete the map task and verify that we ask for one more
+    allocator.close();
+  }
+
+  private void validateLabelsRequests(ResourceRequest resourceRequest,
+      boolean isReduce) {
+    switch (resourceRequest.getResourceName()) {
+    case "map":
+    case "reduce":
+    case NetworkTopology.DEFAULT_RACK:
+      Assert.assertNull(resourceRequest.getNodeLabelExpression());
+      break;
+    case "*":
+      Assert.assertEquals(isReduce ? "ReduceNodes" : "MapNodes",
+          resourceRequest.getNodeLabelExpression());
+      break;
+    default:
+      Assert.fail("Invalid resource location "
+          + resourceRequest.getResourceName());
+    }
+  }
+
+  @Test
   public void testMapReduceScheduling() throws Exception {
 
     LOG.info("Running testMapReduceScheduling");
@@ -1566,6 +1652,7 @@ public class TestRMContainerAllocator {
             .getNumContainers(), req.getRelaxLocality());
         askCopy.add(reqCopy);
       }
+      SecurityUtil.setTokenServiceUseIp(false);
       lastAsk = ask;
       lastRelease = release;
       lastBlacklistAdditions = blacklistAdditions;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index c028aaf..09e1002 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -63,6 +63,26 @@ public interface MRJobConfig {
 
   public static final String QUEUE_NAME = "mapreduce.job.queuename";
 
+  /**
+   *  Node Label expression applicable for all Job containers.
+   */
+  public static final String JOB_NODE_LABEL_EXP = 
"mapreduce.job.node-label-expression";
+
+  /**
+   * Node Label expression applicable for AM containers.
+   */
+  public static final String AM_NODE_LABEL_EXP = 
"mapreduce.job.am.node-label-expression";
+
+  /**
+   *  Node Label expression applicable for map containers.
+   */
+  public static final String MAP_NODE_LABEL_EXP = 
"mapreduce.map.node-label-expression";
+
+  /**
+   * Node Label expression applicable for reduce containers.
+   */
+  public static final String REDUCE_NODE_LABEL_EXP = 
"mapreduce.reduce.node-label-expression";
+
   public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
 
   public static final String JOB_TAGS = "mapreduce.job.tags";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index e9cb3f9..d01dd5f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1584,6 +1584,41 @@
 <!-- MR YARN Application properties -->
 
 <property>
+ <name>mapreduce.job.node-label-expression</name>
+  <description>All the containers of the Map Reduce job will be run with this
+  node label expression. If the node-label-expression for job is not set, then
+  it will use queue's default-node-label-expression for all job's containers.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.am.node-label-expression</name>
+  <description>This is node-label configuration for Map Reduce Application 
Master
+  container. If not configured it will make use of
+  mapreduce.job.node-label-expression and if job's node-label expression is not
+  configured then it will use queue's default-node-label-expression.
+  </description>
+</property>
+
+<property>
+ <name>mapreduce.map.node-label-expression</name>
+  <description>This is node-label configuration for Map task containers. If not
+  configured it will use mapreduce.job.node-label-expression and if job's
+  node-label expression is not configured then it will use queue's
+  default-node-label-expression.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.reduce.node-label-expression</name>
+  <description>This is node-label configuration for Reduce task containers. If
+  not configured it will use mapreduce.job.node-label-expression and if job's
+  node-label expression is not configured then it will use queue's
+  default-node-label-expression.
+  </description>
+</property>
+
+<property>
  <name>mapreduce.job.counters.limit</name>
   <value>120</value>
   <description>Limit on the number of user counters allowed per job.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 510f099..624345c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -76,8 +76,10 @@ import 
org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -97,7 +99,15 @@ public class YARNRunner implements ClientProtocol {
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
 
-  private final RecordFactory recordFactory = 
RecordFactoryProvider.getRecordFactory(null);
+  private final static RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  public final static Priority AM_CONTAINER_PRIORITY = recordFactory
+      .newRecordInstance(Priority.class);
+  static {
+    AM_CONTAINER_PRIORITY.setPriority(0);
+  }
+
   private ResourceMgrDelegate resMgrDelegate;
   private ClientCache clientCache;
   private Configuration conf;
@@ -530,6 +540,24 @@ public class YARNRunner implements ClientProtocol {
         conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
             MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
     appContext.setResource(capability);
+
+    // set labels for the AM container request if present
+    String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
+    if (null != amNodelabelExpression
+        && amNodelabelExpression.trim().length() != 0) {
+      ResourceRequest amResourceRequest =
+          recordFactory.newRecordInstance(ResourceRequest.class);
+      amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
+      amResourceRequest.setResourceName(ResourceRequest.ANY);
+      amResourceRequest.setCapability(capability);
+      amResourceRequest.setNumContainers(1);
+      amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
+      appContext.setAMContainerResourceRequest(amResourceRequest);
+    }
+    // set labels for the Job containers
+    appContext.setNodeLabelExpression(jobConf
+        .get(JobContext.JOB_NODE_LABEL_EXP));
+
     appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
     if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
       appContext.setApplicationTags(new HashSet<String>(tagsFromConf));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index e0db094..470218e 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -563,6 +563,22 @@ public class TestYARNRunner extends TestCase {
     testAMStandardEnv(true);
   }
 
+  @Test
+  public void testNodeLabelExp() throws Exception {
+    JobConf jobConf = new JobConf();
+
+    jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU");
+    jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem");
+
+    YARNRunner yarnRunner = new YARNRunner(jobConf);
+    ApplicationSubmissionContext appSubCtx =
+        buildSubmitContext(yarnRunner, jobConf);
+
+    assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
+    assertEquals(appSubCtx.getAMContainerResourceRequest()
+        .getNodeLabelExpression(), "highMem");
+  }
+
   private void testAMStandardEnv(boolean customLibPath) throws Exception {
     // the Windows behavior is different and this test currently doesn't really
     // apply


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to