MAPREDUCE-6304. Specifying node labels when submitting MR jobs. Contributed by Naganarasimha G R. Backport by Inigo Goiri.
(cherry picked from commit 3164e7d83875aa6b7435d1dfe61ac280aa277f1c) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42c2d36b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42c2d36b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42c2d36b Branch: refs/heads/branch-2.7 Commit: 42c2d36ba620bfd560e5000ec06f9cac08d602d3 Parents: 2c3f6ae Author: Konstantin V Shvachko <s...@apache.org> Authored: Thu May 18 00:20:15 2017 -0700 Committer: Konstantin V Shvachko <s...@apache.org> Committed: Thu May 18 00:20:15 2017 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 19 ++++- .../v2/app/rm/RMContainerRequestor.java | 30 ++++--- .../v2/app/rm/TestRMContainerAllocator.java | 89 +++++++++++++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 20 +++++ .../src/main/resources/mapred-default.xml | 35 ++++++++ .../org/apache/hadoop/mapred/YARNRunner.java | 30 ++++++- .../apache/hadoop/mapred/TestYARNRunner.java | 16 ++++ 8 files changed, 225 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4d8753e..bb2a73f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -6,6 +6,9 @@ Release 2.7.4 - UNRELEASED NEW FEATURES + MAPREDUCE-6304. Specifying node labels when submitting MR jobs. + (Naganarasimha G R. Backport by Inigo Goiri) + IMPROVEMENTS MAPREDUCE-6741. Add MR support to redact job conf properties. (Haibo Chen http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 11ca5fa..0df58b7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -178,6 +178,10 @@ public class RMContainerAllocator extends RMContainerRequestor private ScheduleStats scheduleStats = new ScheduleStats(); + private String mapNodeLabelExpression; + + private String reduceNodeLabelExpression; + public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); this.stopped = new AtomicBoolean(false); @@ -214,6 +218,8 @@ public class RMContainerAllocator extends RMContainerRequestor RackResolver.init(conf); retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); + mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP); + reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP); // Init startTime to current time. If all goes well, it will be reset after // first attempt to contact RM. retrystartTime = System.currentTimeMillis(); @@ -404,9 +410,11 @@ public class RMContainerAllocator extends RMContainerRequestor reduceResourceRequest.getVirtualCores()); if (reqEvent.getEarlierAttemptFailed()) { //add to the front of queue for fail fast - pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); + pendingReduces.addFirst(new ContainerRequest(reqEvent, + PRIORITY_REDUCE, reduceNodeLabelExpression)); } else { - pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); + pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE, + reduceNodeLabelExpression)); //reduces are added to pending and are slowly ramped up } } @@ -978,7 +986,9 @@ public class RMContainerAllocator extends RMContainerRequestor if (event.getEarlierAttemptFailed()) { earlierFailedMaps.add(event.getAttemptID()); - request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP); + request = + new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP, + mapNodeLabelExpression); LOG.info("Added "+event.getAttemptID()+" to list of failed maps"); } else { for (String host : event.getHosts()) { @@ -1003,7 +1013,8 @@ public class RMContainerAllocator extends RMContainerRequestor LOG.debug("Added attempt req to rack " + rack); } } - request = new ContainerRequest(event, PRIORITY_MAP); + request = + new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression); } maps.put(event.getAttemptID(), request); addContainerReq(request); http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index b466668..d89a82d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -122,39 +122,43 @@ public abstract class RMContainerRequestor extends RMCommunicator { final String[] racks; //final boolean earlierAttemptFailed; final Priority priority; + final String nodeLabelExpression; + /** * the time when this request object was formed; can be used to avoid * aggressive preemption for recently placed requests */ final long requestTimeMs; - public ContainerRequest(ContainerRequestEvent event, Priority priority) { + public ContainerRequest(ContainerRequestEvent event, Priority priority, + String nodeLabelExpression) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority); + event.getRacks(), priority, nodeLabelExpression); } public ContainerRequest(ContainerRequestEvent event, Priority priority, long requestTimeMs) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority, requestTimeMs); + event.getRacks(), priority, requestTimeMs, null); } public ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, - Priority priority) { + Priority priority, String nodeLabelExpression) { this(attemptID, capability, hosts, racks, priority, - System.currentTimeMillis()); + System.currentTimeMillis(), nodeLabelExpression); } public ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, - Priority priority, long requestTimeMs) { + Priority priority, long requestTimeMs, String nodeLabelExpression) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; this.racks = racks; this.priority = priority; this.requestTimeMs = requestTimeMs; + this.nodeLabelExpression = nodeLabelExpression; } public String toString() { @@ -391,17 +395,20 @@ public abstract class RMContainerRequestor extends RMCommunicator { for (String host : req.hosts) { // Data-local if (!isNodeBlacklisted(host)) { - addResourceRequest(req.priority, host, req.capability); + addResourceRequest(req.priority, host, req.capability, + null); } } // Nothing Rack-local for now for (String rack : req.racks) { - addResourceRequest(req.priority, rack, req.capability); + addResourceRequest(req.priority, rack, req.capability, + null); } // Off-switch - addResourceRequest(req.priority, ResourceRequest.ANY, req.capability); + addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, + req.nodeLabelExpression); } protected void decContainerReq(ContainerRequest req) { @@ -418,7 +425,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { } private void addResourceRequest(Priority priority, String resourceName, - Resource capability) { + Resource capability, String nodeLabelExpression) { Map<String, Map<Resource, ResourceRequest>> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -440,6 +447,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { remoteRequest.setResourceName(resourceName); remoteRequest.setCapability(capability); remoteRequest.setNumContainers(0); + remoteRequest.setNodeLabelExpression(nodeLabelExpression); reqMap.put(capability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); @@ -534,7 +542,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { } String[] hosts = newHosts.toArray(new String[newHosts.size()]); ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability, - hosts, orig.racks, orig.priority); + hosts, orig.racks, orig.priority, orig.nodeLabelExpression); return newReq; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 24b8766..da1fbfb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -81,6 +81,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -495,7 +496,7 @@ public class TestRMContainerAllocator { ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null)); + new RMContainerRequestor.ContainerRequest(event1, null, null)); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -629,6 +630,91 @@ public class TestRMContainerAllocator { } @Test + public void testMapReduceAllocationWithNodeLabelExpression() throws Exception { + + LOG.info("Running testMapReduceAllocationWithNodeLabelExpression"); + Configuration conf = new Configuration(); + /* + * final int MAP_LIMIT = 3; final int REDUCE_LIMIT = 1; + * conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); + * conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); + */ + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f); + conf.set(MRJobConfig.MAP_NODE_LABEL_EXP, "MapNodes"); + conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes"); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); + MyContainerAllocator allocator = + new MyContainerAllocator(null, conf, appAttemptId, mockJob) { + @Override + protected void register() { + } + + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + }; + + // create some map requests + ContainerRequestEvent reqMapEvents; + reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" }); + allocator.sendRequests(Arrays.asList(reqMapEvents)); + + // create some reduce requests + ContainerRequestEvent reqReduceEvents; + reqReduceEvents = + createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true); + allocator.sendRequests(Arrays.asList(reqReduceEvents)); + allocator.schedule(); + // verify all of the host-specific asks were sent plus one for the + // default rack and one for the ANY request + Assert.assertEquals(3, mockScheduler.lastAsk.size()); + // verify ResourceRequest sent for MAP have appropriate node + // label expression as per the configuration + validateLabelsRequests(mockScheduler.lastAsk.get(0), false); + validateLabelsRequests(mockScheduler.lastAsk.get(1), false); + validateLabelsRequests(mockScheduler.lastAsk.get(2), false); + + // assign a map task and verify we do not ask for any more maps + ContainerId cid0 = mockScheduler.assignContainer("map", false); + allocator.schedule(); + // default rack and one for the ANY request + Assert.assertEquals(3, mockScheduler.lastAsk.size()); + validateLabelsRequests(mockScheduler.lastAsk.get(0), true); + validateLabelsRequests(mockScheduler.lastAsk.get(1), true); + validateLabelsRequests(mockScheduler.lastAsk.get(2), true); + + // complete the map task and verify that we ask for one more + allocator.close(); + } + + private void validateLabelsRequests(ResourceRequest resourceRequest, + boolean isReduce) { + switch (resourceRequest.getResourceName()) { + case "map": + case "reduce": + case NetworkTopology.DEFAULT_RACK: + Assert.assertNull(resourceRequest.getNodeLabelExpression()); + break; + case "*": + Assert.assertEquals(isReduce ? "ReduceNodes" : "MapNodes", + resourceRequest.getNodeLabelExpression()); + break; + default: + Assert.fail("Invalid resource location " + + resourceRequest.getResourceName()); + } + } + + @Test public void testMapReduceScheduling() throws Exception { LOG.info("Running testMapReduceScheduling"); @@ -1566,6 +1652,7 @@ public class TestRMContainerAllocator { .getNumContainers(), req.getRelaxLocality()); askCopy.add(reqCopy); } + SecurityUtil.setTokenServiceUseIp(false); lastAsk = ask; lastRelease = release; lastBlacklistAdditions = blacklistAdditions; http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index c028aaf..09e1002 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -63,6 +63,26 @@ public interface MRJobConfig { public static final String QUEUE_NAME = "mapreduce.job.queuename"; + /** + * Node Label expression applicable for all Job containers. + */ + public static final String JOB_NODE_LABEL_EXP = "mapreduce.job.node-label-expression"; + + /** + * Node Label expression applicable for AM containers. + */ + public static final String AM_NODE_LABEL_EXP = "mapreduce.job.am.node-label-expression"; + + /** + * Node Label expression applicable for map containers. + */ + public static final String MAP_NODE_LABEL_EXP = "mapreduce.map.node-label-expression"; + + /** + * Node Label expression applicable for reduce containers. + */ + public static final String REDUCE_NODE_LABEL_EXP = "mapreduce.reduce.node-label-expression"; + public static final String RESERVATION_ID = "mapreduce.job.reservation.id"; public static final String JOB_TAGS = "mapreduce.job.tags"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index e9cb3f9..d01dd5f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1584,6 +1584,41 @@ <!-- MR YARN Application properties --> <property> + <name>mapreduce.job.node-label-expression</name> + <description>All the containers of the Map Reduce job will be run with this + node label expression. If the node-label-expression for job is not set, then + it will use queue's default-node-label-expression for all job's containers. + </description> +</property> + +<property> + <name>mapreduce.job.am.node-label-expression</name> + <description>This is node-label configuration for Map Reduce Application Master + container. If not configured it will make use of + mapreduce.job.node-label-expression and if job's node-label expression is not + configured then it will use queue's default-node-label-expression. + </description> +</property> + +<property> + <name>mapreduce.map.node-label-expression</name> + <description>This is node-label configuration for Map task containers. If not + configured it will use mapreduce.job.node-label-expression and if job's + node-label expression is not configured then it will use queue's + default-node-label-expression. + </description> +</property> + +<property> + <name>mapreduce.reduce.node-label-expression</name> + <description>This is node-label configuration for Reduce task containers. If + not configured it will use mapreduce.job.node-label-expression and if job's + node-label expression is not configured then it will use queue's + default-node-label-expression. + </description> +</property> + +<property> <name>mapreduce.job.counters.limit</name> <value>120</value> <description>Limit on the number of user counters allowed per job. http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 510f099..624345c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -76,8 +76,10 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -97,7 +99,15 @@ public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private final static RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + public final static Priority AM_CONTAINER_PRIORITY = recordFactory + .newRecordInstance(Priority.class); + static { + AM_CONTAINER_PRIORITY.setPriority(0); + } + private ResourceMgrDelegate resMgrDelegate; private ClientCache clientCache; private Configuration conf; @@ -530,6 +540,24 @@ public class YARNRunner implements ClientProtocol { conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); appContext.setResource(capability); + + // set labels for the AM container request if present + String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP); + if (null != amNodelabelExpression + && amNodelabelExpression.trim().length() != 0) { + ResourceRequest amResourceRequest = + recordFactory.newRecordInstance(ResourceRequest.class); + amResourceRequest.setPriority(AM_CONTAINER_PRIORITY); + amResourceRequest.setResourceName(ResourceRequest.ANY); + amResourceRequest.setCapability(capability); + amResourceRequest.setNumContainers(1); + amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim()); + appContext.setAMContainerResourceRequest(amResourceRequest); + } + // set labels for the Job containers + appContext.setNodeLabelExpression(jobConf + .get(JobContext.JOB_NODE_LABEL_EXP)); + appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE); if (tagsFromConf != null && !tagsFromConf.isEmpty()) { appContext.setApplicationTags(new HashSet<String>(tagsFromConf)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/42c2d36b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index e0db094..470218e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -563,6 +563,22 @@ public class TestYARNRunner extends TestCase { testAMStandardEnv(true); } + @Test + public void testNodeLabelExp() throws Exception { + JobConf jobConf = new JobConf(); + + jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU"); + jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem"); + + YARNRunner yarnRunner = new YARNRunner(jobConf); + ApplicationSubmissionContext appSubCtx = + buildSubmitContext(yarnRunner, jobConf); + + assertEquals(appSubCtx.getNodeLabelExpression(), "GPU"); + assertEquals(appSubCtx.getAMContainerResourceRequest() + .getNodeLabelExpression(), "highMem"); + } + private void testAMStandardEnv(boolean customLibPath) throws Exception { // the Windows behavior is different and this test currently doesn't really // apply --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org