MAPREDUCE-5583. Ability to limit running map and reduce tasks. Contributed by 
Jason Lowe.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68c9b55e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68c9b55e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68c9b55e

Branch: refs/heads/HDFS-7285
Commit: 68c9b55e9d3ff5959b750502724d9c3db23171c1
Parents: 4a3ef07
Author: Junping Du <junping...@apache.org>
Authored: Tue Mar 3 02:01:04 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Mar 9 13:11:22 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../v2/app/rm/RMContainerAllocator.java         |  65 +++++-
 .../v2/app/rm/RMContainerRequestor.java         |  74 ++++++-
 .../v2/app/rm/TestRMContainerAllocator.java     | 214 +++++++++++++++++++
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   8 +
 .../src/main/resources/mapred-default.xml       |  16 ++
 6 files changed, 363 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 5524b14..7a2eff3 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -258,6 +258,9 @@ Release 2.7.0 - UNRELEASED
 
     MAPREDUCE-6228. Add truncate operation to SLive. (Plamen Jeliazkov via shv)
 
+    MAPREDUCE-5583. Ability to limit running map and reduce tasks. 
+    (Jason Lowe via junping_du)
+
   IMPROVEMENTS
 
     MAPREDUCE-6149. Document override log4j.properties in MR job.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 1acfeec..efea674 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -99,9 +99,9 @@ public class RMContainerAllocator extends RMContainerRequestor
   public static final 
   float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
   
-  private static final Priority PRIORITY_FAST_FAIL_MAP;
-  private static final Priority PRIORITY_REDUCE;
-  private static final Priority PRIORITY_MAP;
+  static final Priority PRIORITY_FAST_FAIL_MAP;
+  static final Priority PRIORITY_REDUCE;
+  static final Priority PRIORITY_MAP;
 
   @VisibleForTesting
   public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
@@ -166,6 +166,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
    */
   private long allocationDelayThresholdMs = 0;
   private float reduceSlowStart = 0;
+  private int maxRunningMaps = 0;
+  private int maxRunningReduces = 0;
   private long retryInterval;
   private long retrystartTime;
   private Clock clock;
@@ -201,6 +203,10 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     allocationDelayThresholdMs = conf.getInt(
         MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
         MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> 
ms
+    maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
+        MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT);
+    maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT,
+        MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT);
     RackResolver.init(conf);
     retryInterval = 
getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
                                 
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@@ -664,6 +670,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
   
   @SuppressWarnings("unchecked")
   private List<Container> getResources() throws Exception {
+    applyConcurrentTaskLimits();
+
     // will be null the first time
     Resource headRoom =
         getAvailableResources() == null ? Resources.none() :
@@ -778,6 +786,43 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     return newContainers;
   }
 
+  private void applyConcurrentTaskLimits() {
+    int numScheduledMaps = scheduledRequests.maps.size();
+    if (maxRunningMaps > 0 && numScheduledMaps > 0) {
+      int maxRequestedMaps = Math.max(0,
+          maxRunningMaps - assignedRequests.maps.size());
+      int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
+      int failedMapRequestLimit = Math.min(maxRequestedMaps,
+          numScheduledFailMaps);
+      int normalMapRequestLimit = Math.min(
+          maxRequestedMaps - failedMapRequestLimit,
+          numScheduledMaps - numScheduledFailMaps);
+      setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
+          failedMapRequestLimit);
+      setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
+    }
+
+    int numScheduledReduces = scheduledRequests.reduces.size();
+    if (maxRunningReduces > 0 && numScheduledReduces > 0) {
+      int maxRequestedReduces = Math.max(0,
+          maxRunningReduces - assignedRequests.reduces.size());
+      int reduceRequestLimit = Math.min(maxRequestedReduces,
+          numScheduledReduces);
+      setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
+          reduceRequestLimit);
+    }
+  }
+
+  private boolean canAssignMaps() {
+    return (maxRunningMaps <= 0
+        || assignedRequests.maps.size() < maxRunningMaps);
+  }
+
+  private boolean canAssignReduces() {
+    return (maxRunningReduces <= 0
+        || assignedRequests.reduces.size() < maxRunningReduces);
+  }
+
   private void updateAMRMToken(Token token) throws IOException {
     org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
         new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
@@ -1046,8 +1091,7 @@ public class RMContainerAllocator extends 
RMContainerRequestor
       it = allocatedContainers.iterator();
       while (it.hasNext()) {
         Container allocated = it.next();
-        LOG.info("Releasing unassigned and invalid container " 
-            + allocated + ". RM may have assignment issues");
+        LOG.info("Releasing unassigned container " + allocated);
         containerNotAssigned(allocated);
       }
     }
@@ -1150,7 +1194,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     private ContainerRequest assignToFailedMap(Container allocated) {
       //try to assign to earlierFailedMaps if present
       ContainerRequest assigned = null;
-      while (assigned == null && earlierFailedMaps.size() > 0) {
+      while (assigned == null && earlierFailedMaps.size() > 0
+          && canAssignMaps()) {
         TaskAttemptId tId = earlierFailedMaps.removeFirst();      
         if (maps.containsKey(tId)) {
           assigned = maps.remove(tId);
@@ -1168,7 +1213,7 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     private ContainerRequest assignToReduce(Container allocated) {
       ContainerRequest assigned = null;
       //try to assign to reduces if present
-      if (assigned == null && reduces.size() > 0) {
+      if (assigned == null && reduces.size() > 0 && canAssignReduces()) {
         TaskAttemptId tId = reduces.keySet().iterator().next();
         assigned = reduces.remove(tId);
         LOG.info("Assigned to reduce");
@@ -1180,7 +1225,7 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     private void assignMapsWithLocality(List<Container> allocatedContainers) {
       // try to assign to all nodes first to match node local
       Iterator<Container> it = allocatedContainers.iterator();
-      while(it.hasNext() && maps.size() > 0){
+      while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
         Container allocated = it.next();        
         Priority priority = allocated.getPriority();
         assert PRIORITY_MAP.equals(priority);
@@ -1212,7 +1257,7 @@ public class RMContainerAllocator extends 
RMContainerRequestor
       
       // try to match all rack local
       it = allocatedContainers.iterator();
-      while(it.hasNext() && maps.size() > 0){
+      while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
         Container allocated = it.next();
         Priority priority = allocated.getPriority();
         assert PRIORITY_MAP.equals(priority);
@@ -1242,7 +1287,7 @@ public class RMContainerAllocator extends 
RMContainerRequestor
       
       // assign remaining
       it = allocatedContainers.iterator();
-      while(it.hasNext() && maps.size() > 0){
+      while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
         Container allocated = it.next();
         Priority priority = allocated.getPriority();
         assert PRIORITY_MAP.equals(priority);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index bb9ad02..1666864 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import 
org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -58,6 +60,8 @@ import com.google.common.annotations.VisibleForTesting;
 public abstract class RMContainerRequestor extends RMCommunicator {
   
   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
+  private static final ResourceRequestComparator RESOURCE_REQUEST_COMPARATOR =
+      new ResourceRequestComparator();
 
   protected int lastResponseID;
   private Resource availableResources;
@@ -77,12 +81,18 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
   // use custom comparator to make sure ResourceRequest objects differing only 
in 
   // numContainers dont end up as duplicates
   private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
-      new 
org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
+      RESOURCE_REQUEST_COMPARATOR);
   private final Set<ContainerId> release = new TreeSet<ContainerId>();
   // pendingRelease holds history or release requests.request is removed only 
if
   // RM sends completedContainer.
   // How it different from release? --> release is for per allocate() request.
   protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
+
+  private final Map<ResourceRequest,ResourceRequest> requestLimits =
+      new 
TreeMap<ResourceRequest,ResourceRequest>(RESOURCE_REQUEST_COMPARATOR);
+  private final Set<ResourceRequest> requestLimitsToUpdate =
+      new TreeSet<ResourceRequest>(RESOURCE_REQUEST_COMPARATOR);
+
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
   private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@@ -178,6 +188,7 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
 
   protected AllocateResponse makeRemoteRequest() throws YarnException,
       IOException {
+    applyRequestLimits();
     ResourceBlacklistRequest blacklistRequest =
         ResourceBlacklistRequest.newInstance(new 
ArrayList<String>(blacklistAdditions),
             new ArrayList<String>(blacklistRemovals));
@@ -190,13 +201,14 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
     availableResources = allocateResponse.getAvailableResources();
     lastClusterNmCount = clusterNmCount;
     clusterNmCount = allocateResponse.getNumClusterNodes();
+    int numCompletedContainers =
+        allocateResponse.getCompletedContainersStatuses().size();
 
     if (ask.size() > 0 || release.size() > 0) {
       LOG.info("getResources() for " + applicationId + ":" + " ask="
           + ask.size() + " release= " + release.size() + " newContainers="
           + allocateResponse.getAllocatedContainers().size()
-          + " finishedContainers="
-          + allocateResponse.getCompletedContainersStatuses().size()
+          + " finishedContainers=" + numCompletedContainers
           + " resourcelimit=" + availableResources + " knownNMs="
           + clusterNmCount);
     }
@@ -204,6 +216,12 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
     ask.clear();
     release.clear();
 
+    if (numCompletedContainers > 0) {
+      // re-send limited requests when a container completes to trigger asking
+      // for more containers
+      requestLimitsToUpdate.addAll(requestLimits.keySet());
+    }
+
     if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
       LOG.info("Update the blacklist for " + applicationId +
           ": blacklistAdditions=" + blacklistAdditions.size() +
@@ -214,6 +232,36 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
     return allocateResponse;
   }
 
+  private void applyRequestLimits() {
+    Iterator<ResourceRequest> iter = requestLimits.values().iterator();
+    while (iter.hasNext()) {
+      ResourceRequest reqLimit = iter.next();
+      int limit = reqLimit.getNumContainers();
+      Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+          remoteRequestsTable.get(reqLimit.getPriority());
+      Map<Resource, ResourceRequest> reqMap = (remoteRequests != null)
+          ? remoteRequests.get(ResourceRequest.ANY) : null;
+      ResourceRequest req = (reqMap != null)
+          ? reqMap.get(reqLimit.getCapability()) : null;
+      if (req == null) {
+        continue;
+      }
+      // update an existing ask or send a new one if updating
+      if (ask.remove(req) || requestLimitsToUpdate.contains(req)) {
+        ResourceRequest newReq = req.getNumContainers() > limit
+            ? reqLimit : req;
+        ask.add(newReq);
+        LOG.info("Applying ask limit of " + newReq.getNumContainers()
+            + " for priority:" + reqLimit.getPriority()
+            + " and capability:" + reqLimit.getCapability());
+      }
+      if (limit == Integer.MAX_VALUE) {
+        iter.remove();
+      }
+    }
+    requestLimitsToUpdate.clear();
+  }
+
   protected void addOutstandingRequestOnResync() {
     for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
         .values()) {
@@ -229,6 +277,7 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
     if (!pendingRelease.isEmpty()) {
       release.addAll(pendingRelease);
     }
+    requestLimitsToUpdate.addAll(requestLimits.keySet());
   }
 
   // May be incorrect if there's multiple NodeManagers running on a single 
host.
@@ -459,10 +508,8 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
     // because objects inside the resource map can be deleted ask can end up 
     // containing an object that matches new resource object but with different
-    // numContainers. So exisintg values must be replaced explicitly
-    if(ask.contains(remoteRequest)) {
-      ask.remove(remoteRequest);
-    }
+    // numContainers. So existing values must be replaced explicitly
+    ask.remove(remoteRequest);
     ask.add(remoteRequest);    
   }
 
@@ -490,6 +537,19 @@ public abstract class RMContainerRequestor extends 
RMCommunicator {
     return newReq;
   }
   
+  protected void setRequestLimit(Priority priority, Resource capability,
+      int limit) {
+    if (limit < 0) {
+      limit = Integer.MAX_VALUE;
+    }
+    ResourceRequest newReqLimit = ResourceRequest.newInstance(priority,
+        ResourceRequest.ANY, capability, limit);
+    ResourceRequest oldReqLimit = requestLimits.put(newReqLimit, newReqLimit);
+    if (oldReqLimit == null || oldReqLimit.getNumContainers() < limit) {
+      requestLimitsToUpdate.add(newReqLimit);
+    }
+  }
+
   public Set<String> getBlacklistedNodes() {
     return blacklistedNodes;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 4759693..eca1a4d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -31,9 +31,11 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -81,7 +83,13 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -89,6 +97,10 @@ import 
org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -2387,6 +2399,208 @@ public class TestRMContainerAllocator {
         new Text(rmAddr), ugiToken.getService());
   }
 
+  @Test
+  public void testConcurrentTaskLimits() throws Exception {
+    final int MAP_LIMIT = 3;
+    final int REDUCE_LIMIT = 1;
+    LOG.info("Running testConcurrentTaskLimits");
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
+    conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+    MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
+        appAttemptId, mockJob) {
+          @Override
+          protected void register() {
+          }
+
+          @Override
+          protected ApplicationMasterProtocol createSchedulerProxy() {
+            return mockScheduler;
+          }
+    };
+
+    // create some map requests
+    ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
+    for (int i = 0; i < reqMapEvents.length; ++i) {
+      reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+    }
+    allocator.sendRequests(Arrays.asList(reqMapEvents));
+
+    // create some reduce requests
+    ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
+    for (int i = 0; i < reqReduceEvents.length; ++i) {
+      reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
+          false, true);
+    }
+    allocator.sendRequests(Arrays.asList(reqReduceEvents));
+    allocator.schedule();
+
+    // verify all of the host-specific asks were sent plus one for the
+    // default rack and one for the ANY request
+    Assert.assertEquals(reqMapEvents.length + 2, mockScheduler.lastAsk.size());
+
+    // verify AM is only asking for the map limit overall
+    Assert.assertEquals(MAP_LIMIT, mockScheduler.lastAnyAskMap);
+
+    // assign a map task and verify we do not ask for any more maps
+    ContainerId cid0 = mockScheduler.assignContainer("h0", false);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(2, mockScheduler.lastAnyAskMap);
+
+    // complete the map task and verify that we ask for one more
+    mockScheduler.completeContainer(cid0);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(3, mockScheduler.lastAnyAskMap);
+
+    // assign three more maps and verify we ask for no more maps
+    ContainerId cid1 = mockScheduler.assignContainer("h1", false);
+    ContainerId cid2 = mockScheduler.assignContainer("h2", false);
+    ContainerId cid3 = mockScheduler.assignContainer("h3", false);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
+
+    // complete two containers and verify we only asked for one more
+    // since at that point all maps should be scheduled/completed
+    mockScheduler.completeContainer(cid2);
+    mockScheduler.completeContainer(cid3);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(1, mockScheduler.lastAnyAskMap);
+
+    // allocate the last container and complete the first one
+    // and verify there are no more map asks.
+    mockScheduler.completeContainer(cid1);
+    ContainerId cid4 = mockScheduler.assignContainer("h4", false);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
+
+    // complete the last map
+    mockScheduler.completeContainer(cid4);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
+
+    // verify only reduce limit being requested
+    Assert.assertEquals(REDUCE_LIMIT, mockScheduler.lastAnyAskReduce);
+
+    // assign a reducer and verify ask goes to zero
+    cid0 = mockScheduler.assignContainer("h0", true);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
+
+    // complete the reducer and verify we ask for another
+    mockScheduler.completeContainer(cid0);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(1, mockScheduler.lastAnyAskReduce);
+
+    // assign a reducer and verify ask goes to zero
+    cid0 = mockScheduler.assignContainer("h0", true);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
+
+    // complete the reducer and verify no more reducers
+    mockScheduler.completeContainer(cid0);
+    allocator.schedule();
+    allocator.schedule();
+    Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
+    allocator.close();
+  }
+
+  private static class MockScheduler implements ApplicationMasterProtocol {
+    ApplicationAttemptId attemptId;
+    long nextContainerId = 10;
+    List<ResourceRequest> lastAsk = null;
+    int lastAnyAskMap = 0;
+    int lastAnyAskReduce = 0;
+    List<ContainerStatus> containersToComplete =
+        new ArrayList<ContainerStatus>();
+    List<Container> containersToAllocate = new ArrayList<Container>();
+
+    public MockScheduler(ApplicationAttemptId attemptId) {
+      this.attemptId = attemptId;
+    }
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request) throws YarnException,
+        IOException {
+      return RegisterApplicationMasterResponse.newInstance(
+          Resource.newInstance(512, 1),
+          Resource.newInstance(512000, 1024),
+          Collections.<ApplicationAccessType,String>emptyMap(),
+          ByteBuffer.wrap("fake_key".getBytes()),
+          Collections.<Container>emptyList(),
+          "default",
+          Collections.<NMToken>emptyList());
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request) throws YarnException,
+        IOException {
+      return FinishApplicationMasterResponse.newInstance(false);
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnException, IOException {
+      lastAsk = request.getAskList();
+      for (ResourceRequest req : lastAsk) {
+        if (ResourceRequest.ANY.equals(req.getResourceName())) {
+          Priority priority = req.getPriority();
+          if (priority.equals(RMContainerAllocator.PRIORITY_MAP)) {
+            lastAnyAskMap = req.getNumContainers();
+          } else if (priority.equals(RMContainerAllocator.PRIORITY_REDUCE)){
+            lastAnyAskReduce = req.getNumContainers();
+          }
+        }
+      }
+      AllocateResponse response =  AllocateResponse.newInstance(
+          request.getResponseId(),
+          containersToComplete, containersToAllocate,
+          Collections.<NodeReport>emptyList(),
+          Resource.newInstance(512000, 1024), null, 10, null,
+          Collections.<NMToken>emptyList());
+      containersToComplete.clear();
+      containersToAllocate.clear();
+      return response;
+    }
+
+    public ContainerId assignContainer(String nodeName, boolean isReduce) {
+      ContainerId containerId =
+          ContainerId.newContainerId(attemptId, nextContainerId++);
+      Priority priority = isReduce ? RMContainerAllocator.PRIORITY_REDUCE
+          : RMContainerAllocator.PRIORITY_MAP;
+      Container container = Container.newInstance(containerId,
+          NodeId.newInstance(nodeName, 1234), nodeName + ":5678",
+        Resource.newInstance(1024, 1), priority, null);
+      containersToAllocate.add(container);
+      return containerId;
+    }
+
+    public void completeContainer(ContainerId containerId) {
+      containersToComplete.add(ContainerStatus.newInstance(containerId,
+          ContainerState.COMPLETE, "", 0));
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index d06b075..5527103 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -373,6 +373,14 @@ public interface MRJobConfig {
 
   public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
   
+  public static final String JOB_RUNNING_MAP_LIMIT =
+      "mapreduce.job.running.map.limit";
+  public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0;
+
+  public static final String JOB_RUNNING_REDUCE_LIMIT =
+      "mapreduce.job.running.reduce.limit";
+  public static final int DEFAULT_JOB_RUNNING_REDUCE_LIMIT = 0;
+
   /* config for tracking the local file where all the credentials for the job
    * credentials.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 6e80679..d864756 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -83,6 +83,22 @@
 </property>
 
 <property>
+  <name>mapreduce.job.running.map.limit</name>
+  <value>0</value>
+  <description>The maximum number of simultaneous map tasks per job.
+  There is no limit if this value is 0 or negative.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.running.reduce.limit</name>
+  <value>0</value>
+  <description>The maximum number of simultaneous reduce tasks per job.
+  There is no limit if this value is 0 or negative.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.job.reducer.preempt.delay.sec</name>
   <value>0</value>
   <description>The threshold in terms of seconds after which an unsatisfied 
mapper 

Reply via email to