MAPREDUCE-5583. Ability to limit running map and reduce tasks. Contributed by Jason Lowe.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68c9b55e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68c9b55e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68c9b55e Branch: refs/heads/HDFS-7285 Commit: 68c9b55e9d3ff5959b750502724d9c3db23171c1 Parents: 4a3ef07 Author: Junping Du <junping...@apache.org> Authored: Tue Mar 3 02:01:04 2015 -0800 Committer: Jing Zhao <ji...@apache.org> Committed: Mon Mar 9 13:11:22 2015 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 65 +++++- .../v2/app/rm/RMContainerRequestor.java | 74 ++++++- .../v2/app/rm/TestRMContainerAllocator.java | 214 +++++++++++++++++++ .../apache/hadoop/mapreduce/MRJobConfig.java | 8 + .../src/main/resources/mapred-default.xml | 16 ++ 6 files changed, 363 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5524b14..7a2eff3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -258,6 +258,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6228. Add truncate operation to SLive. (Plamen Jeliazkov via shv) + MAPREDUCE-5583. Ability to limit running map and reduce tasks. + (Jason Lowe via junping_du) + IMPROVEMENTS MAPREDUCE-6149. Document override log4j.properties in MR job. http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 1acfeec..efea674 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -99,9 +99,9 @@ public class RMContainerAllocator extends RMContainerRequestor public static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; - private static final Priority PRIORITY_FAST_FAIL_MAP; - private static final Priority PRIORITY_REDUCE; - private static final Priority PRIORITY_MAP; + static final Priority PRIORITY_FAST_FAIL_MAP; + static final Priority PRIORITY_REDUCE; + static final Priority PRIORITY_MAP; @VisibleForTesting public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted " @@ -166,6 +166,8 @@ public class RMContainerAllocator extends RMContainerRequestor */ private long allocationDelayThresholdMs = 0; private float reduceSlowStart = 0; + private int maxRunningMaps = 0; + private int maxRunningReduces = 0; private long retryInterval; private long retrystartTime; private Clock clock; @@ -201,6 +203,10 @@ public class RMContainerAllocator extends RMContainerRequestor allocationDelayThresholdMs = conf.getInt( MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms + maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, + MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT); + maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, + MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT); RackResolver.init(conf); retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); @@ -664,6 +670,8 @@ public class RMContainerAllocator extends RMContainerRequestor @SuppressWarnings("unchecked") private List<Container> getResources() throws Exception { + applyConcurrentTaskLimits(); + // will be null the first time Resource headRoom = getAvailableResources() == null ? Resources.none() : @@ -778,6 +786,43 @@ public class RMContainerAllocator extends RMContainerRequestor return newContainers; } + private void applyConcurrentTaskLimits() { + int numScheduledMaps = scheduledRequests.maps.size(); + if (maxRunningMaps > 0 && numScheduledMaps > 0) { + int maxRequestedMaps = Math.max(0, + maxRunningMaps - assignedRequests.maps.size()); + int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size(); + int failedMapRequestLimit = Math.min(maxRequestedMaps, + numScheduledFailMaps); + int normalMapRequestLimit = Math.min( + maxRequestedMaps - failedMapRequestLimit, + numScheduledMaps - numScheduledFailMaps); + setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest, + failedMapRequestLimit); + setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit); + } + + int numScheduledReduces = scheduledRequests.reduces.size(); + if (maxRunningReduces > 0 && numScheduledReduces > 0) { + int maxRequestedReduces = Math.max(0, + maxRunningReduces - assignedRequests.reduces.size()); + int reduceRequestLimit = Math.min(maxRequestedReduces, + numScheduledReduces); + setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest, + reduceRequestLimit); + } + } + + private boolean canAssignMaps() { + return (maxRunningMaps <= 0 + || assignedRequests.maps.size() < maxRunningMaps); + } + + private boolean canAssignReduces() { + return (maxRunningReduces <= 0 + || assignedRequests.reduces.size() < maxRunningReduces); + } + private void updateAMRMToken(Token token) throws IOException { org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token @@ -1046,8 +1091,7 @@ public class RMContainerAllocator extends RMContainerRequestor it = allocatedContainers.iterator(); while (it.hasNext()) { Container allocated = it.next(); - LOG.info("Releasing unassigned and invalid container " - + allocated + ". RM may have assignment issues"); + LOG.info("Releasing unassigned container " + allocated); containerNotAssigned(allocated); } } @@ -1150,7 +1194,8 @@ public class RMContainerAllocator extends RMContainerRequestor private ContainerRequest assignToFailedMap(Container allocated) { //try to assign to earlierFailedMaps if present ContainerRequest assigned = null; - while (assigned == null && earlierFailedMaps.size() > 0) { + while (assigned == null && earlierFailedMaps.size() > 0 + && canAssignMaps()) { TaskAttemptId tId = earlierFailedMaps.removeFirst(); if (maps.containsKey(tId)) { assigned = maps.remove(tId); @@ -1168,7 +1213,7 @@ public class RMContainerAllocator extends RMContainerRequestor private ContainerRequest assignToReduce(Container allocated) { ContainerRequest assigned = null; //try to assign to reduces if present - if (assigned == null && reduces.size() > 0) { + if (assigned == null && reduces.size() > 0 && canAssignReduces()) { TaskAttemptId tId = reduces.keySet().iterator().next(); assigned = reduces.remove(tId); LOG.info("Assigned to reduce"); @@ -1180,7 +1225,7 @@ public class RMContainerAllocator extends RMContainerRequestor private void assignMapsWithLocality(List<Container> allocatedContainers) { // try to assign to all nodes first to match node local Iterator<Container> it = allocatedContainers.iterator(); - while(it.hasNext() && maps.size() > 0){ + while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ Container allocated = it.next(); Priority priority = allocated.getPriority(); assert PRIORITY_MAP.equals(priority); @@ -1212,7 +1257,7 @@ public class RMContainerAllocator extends RMContainerRequestor // try to match all rack local it = allocatedContainers.iterator(); - while(it.hasNext() && maps.size() > 0){ + while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ Container allocated = it.next(); Priority priority = allocated.getPriority(); assert PRIORITY_MAP.equals(priority); @@ -1242,7 +1287,7 @@ public class RMContainerAllocator extends RMContainerRequestor // assign remaining it = allocatedContainers.iterator(); - while(it.hasNext() && maps.size() > 0){ + while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ Container allocated = it.next(); Priority priority = allocated.getPriority(); assert PRIORITY_MAP.equals(priority); http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index bb9ad02..1666864 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -58,6 +60,8 @@ import com.google.common.annotations.VisibleForTesting; public abstract class RMContainerRequestor extends RMCommunicator { private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); + private static final ResourceRequestComparator RESOURCE_REQUEST_COMPARATOR = + new ResourceRequestComparator(); protected int lastResponseID; private Resource availableResources; @@ -77,12 +81,18 @@ public abstract class RMContainerRequestor extends RMCommunicator { // use custom comparator to make sure ResourceRequest objects differing only in // numContainers dont end up as duplicates private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( - new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); + RESOURCE_REQUEST_COMPARATOR); private final Set<ContainerId> release = new TreeSet<ContainerId>(); // pendingRelease holds history or release requests.request is removed only if // RM sends completedContainer. // How it different from release? --> release is for per allocate() request. protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>(); + + private final Map<ResourceRequest,ResourceRequest> requestLimits = + new TreeMap<ResourceRequest,ResourceRequest>(RESOURCE_REQUEST_COMPARATOR); + private final Set<ResourceRequest> requestLimitsToUpdate = + new TreeSet<ResourceRequest>(RESOURCE_REQUEST_COMPARATOR); + private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false); @@ -178,6 +188,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { protected AllocateResponse makeRemoteRequest() throws YarnException, IOException { + applyRequestLimits(); ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions), new ArrayList<String>(blacklistRemovals)); @@ -190,13 +201,14 @@ public abstract class RMContainerRequestor extends RMCommunicator { availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; clusterNmCount = allocateResponse.getNumClusterNodes(); + int numCompletedContainers = + allocateResponse.getCompletedContainersStatuses().size(); if (ask.size() > 0 || release.size() > 0) { LOG.info("getResources() for " + applicationId + ":" + " ask=" + ask.size() + " release= " + release.size() + " newContainers=" + allocateResponse.getAllocatedContainers().size() - + " finishedContainers=" - + allocateResponse.getCompletedContainersStatuses().size() + + " finishedContainers=" + numCompletedContainers + " resourcelimit=" + availableResources + " knownNMs=" + clusterNmCount); } @@ -204,6 +216,12 @@ public abstract class RMContainerRequestor extends RMCommunicator { ask.clear(); release.clear(); + if (numCompletedContainers > 0) { + // re-send limited requests when a container completes to trigger asking + // for more containers + requestLimitsToUpdate.addAll(requestLimits.keySet()); + } + if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) { LOG.info("Update the blacklist for " + applicationId + ": blacklistAdditions=" + blacklistAdditions.size() + @@ -214,6 +232,36 @@ public abstract class RMContainerRequestor extends RMCommunicator { return allocateResponse; } + private void applyRequestLimits() { + Iterator<ResourceRequest> iter = requestLimits.values().iterator(); + while (iter.hasNext()) { + ResourceRequest reqLimit = iter.next(); + int limit = reqLimit.getNumContainers(); + Map<String, Map<Resource, ResourceRequest>> remoteRequests = + remoteRequestsTable.get(reqLimit.getPriority()); + Map<Resource, ResourceRequest> reqMap = (remoteRequests != null) + ? remoteRequests.get(ResourceRequest.ANY) : null; + ResourceRequest req = (reqMap != null) + ? reqMap.get(reqLimit.getCapability()) : null; + if (req == null) { + continue; + } + // update an existing ask or send a new one if updating + if (ask.remove(req) || requestLimitsToUpdate.contains(req)) { + ResourceRequest newReq = req.getNumContainers() > limit + ? reqLimit : req; + ask.add(newReq); + LOG.info("Applying ask limit of " + newReq.getNumContainers() + + " for priority:" + reqLimit.getPriority() + + " and capability:" + reqLimit.getCapability()); + } + if (limit == Integer.MAX_VALUE) { + iter.remove(); + } + } + requestLimitsToUpdate.clear(); + } + protected void addOutstandingRequestOnResync() { for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable .values()) { @@ -229,6 +277,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { if (!pendingRelease.isEmpty()) { release.addAll(pendingRelease); } + requestLimitsToUpdate.addAll(requestLimits.keySet()); } // May be incorrect if there's multiple NodeManagers running on a single host. @@ -459,10 +508,8 @@ public abstract class RMContainerRequestor extends RMCommunicator { private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // because objects inside the resource map can be deleted ask can end up // containing an object that matches new resource object but with different - // numContainers. So exisintg values must be replaced explicitly - if(ask.contains(remoteRequest)) { - ask.remove(remoteRequest); - } + // numContainers. So existing values must be replaced explicitly + ask.remove(remoteRequest); ask.add(remoteRequest); } @@ -490,6 +537,19 @@ public abstract class RMContainerRequestor extends RMCommunicator { return newReq; } + protected void setRequestLimit(Priority priority, Resource capability, + int limit) { + if (limit < 0) { + limit = Integer.MAX_VALUE; + } + ResourceRequest newReqLimit = ResourceRequest.newInstance(priority, + ResourceRequest.ANY, capability, limit); + ResourceRequest oldReqLimit = requestLimits.put(newReqLimit, newReqLimit); + if (oldReqLimit == null || oldReqLimit.getNumContainers() < limit) { + requestLimitsToUpdate.add(newReqLimit); + } + } + public Set<String> getBlacklistedNodes() { return blacklistedNodes; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 4759693..eca1a4d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -31,9 +31,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -81,7 +83,13 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -89,6 +97,10 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -2387,6 +2399,208 @@ public class TestRMContainerAllocator { new Text(rmAddr), ugiToken.getService()); } + @Test + public void testConcurrentTaskLimits() throws Exception { + final int MAP_LIMIT = 3; + final int REDUCE_LIMIT = 1; + LOG.info("Running testConcurrentTaskLimits"); + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); + conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 1); + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); + MyContainerAllocator allocator = new MyContainerAllocator(null, conf, + appAttemptId, mockJob) { + @Override + protected void register() { + } + + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + }; + + // create some map requests + ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5]; + for (int i = 0; i < reqMapEvents.length; ++i) { + reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); + } + allocator.sendRequests(Arrays.asList(reqMapEvents)); + + // create some reduce requests + ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2]; + for (int i = 0; i < reqReduceEvents.length; ++i) { + reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {}, + false, true); + } + allocator.sendRequests(Arrays.asList(reqReduceEvents)); + allocator.schedule(); + + // verify all of the host-specific asks were sent plus one for the + // default rack and one for the ANY request + Assert.assertEquals(reqMapEvents.length + 2, mockScheduler.lastAsk.size()); + + // verify AM is only asking for the map limit overall + Assert.assertEquals(MAP_LIMIT, mockScheduler.lastAnyAskMap); + + // assign a map task and verify we do not ask for any more maps + ContainerId cid0 = mockScheduler.assignContainer("h0", false); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(2, mockScheduler.lastAnyAskMap); + + // complete the map task and verify that we ask for one more + mockScheduler.completeContainer(cid0); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(3, mockScheduler.lastAnyAskMap); + + // assign three more maps and verify we ask for no more maps + ContainerId cid1 = mockScheduler.assignContainer("h1", false); + ContainerId cid2 = mockScheduler.assignContainer("h2", false); + ContainerId cid3 = mockScheduler.assignContainer("h3", false); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(0, mockScheduler.lastAnyAskMap); + + // complete two containers and verify we only asked for one more + // since at that point all maps should be scheduled/completed + mockScheduler.completeContainer(cid2); + mockScheduler.completeContainer(cid3); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(1, mockScheduler.lastAnyAskMap); + + // allocate the last container and complete the first one + // and verify there are no more map asks. + mockScheduler.completeContainer(cid1); + ContainerId cid4 = mockScheduler.assignContainer("h4", false); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(0, mockScheduler.lastAnyAskMap); + + // complete the last map + mockScheduler.completeContainer(cid4); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(0, mockScheduler.lastAnyAskMap); + + // verify only reduce limit being requested + Assert.assertEquals(REDUCE_LIMIT, mockScheduler.lastAnyAskReduce); + + // assign a reducer and verify ask goes to zero + cid0 = mockScheduler.assignContainer("h0", true); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); + + // complete the reducer and verify we ask for another + mockScheduler.completeContainer(cid0); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(1, mockScheduler.lastAnyAskReduce); + + // assign a reducer and verify ask goes to zero + cid0 = mockScheduler.assignContainer("h0", true); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); + + // complete the reducer and verify no more reducers + mockScheduler.completeContainer(cid0); + allocator.schedule(); + allocator.schedule(); + Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); + allocator.close(); + } + + private static class MockScheduler implements ApplicationMasterProtocol { + ApplicationAttemptId attemptId; + long nextContainerId = 10; + List<ResourceRequest> lastAsk = null; + int lastAnyAskMap = 0; + int lastAnyAskReduce = 0; + List<ContainerStatus> containersToComplete = + new ArrayList<ContainerStatus>(); + List<Container> containersToAllocate = new ArrayList<Container>(); + + public MockScheduler(ApplicationAttemptId attemptId) { + this.attemptId = attemptId; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return RegisterApplicationMasterResponse.newInstance( + Resource.newInstance(512, 1), + Resource.newInstance(512000, 1024), + Collections.<ApplicationAccessType,String>emptyMap(), + ByteBuffer.wrap("fake_key".getBytes()), + Collections.<Container>emptyList(), + "default", + Collections.<NMToken>emptyList()); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + return FinishApplicationMasterResponse.newInstance(false); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + lastAsk = request.getAskList(); + for (ResourceRequest req : lastAsk) { + if (ResourceRequest.ANY.equals(req.getResourceName())) { + Priority priority = req.getPriority(); + if (priority.equals(RMContainerAllocator.PRIORITY_MAP)) { + lastAnyAskMap = req.getNumContainers(); + } else if (priority.equals(RMContainerAllocator.PRIORITY_REDUCE)){ + lastAnyAskReduce = req.getNumContainers(); + } + } + } + AllocateResponse response = AllocateResponse.newInstance( + request.getResponseId(), + containersToComplete, containersToAllocate, + Collections.<NodeReport>emptyList(), + Resource.newInstance(512000, 1024), null, 10, null, + Collections.<NMToken>emptyList()); + containersToComplete.clear(); + containersToAllocate.clear(); + return response; + } + + public ContainerId assignContainer(String nodeName, boolean isReduce) { + ContainerId containerId = + ContainerId.newContainerId(attemptId, nextContainerId++); + Priority priority = isReduce ? RMContainerAllocator.PRIORITY_REDUCE + : RMContainerAllocator.PRIORITY_MAP; + Container container = Container.newInstance(containerId, + NodeId.newInstance(nodeName, 1234), nodeName + ":5678", + Resource.newInstance(1024, 1), priority, null); + containersToAllocate.add(container); + return containerId; + } + + public void completeContainer(ContainerId containerId) { + containersToComplete.add(ContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, "", 0)); + } + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index d06b075..5527103 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -373,6 +373,14 @@ public interface MRJobConfig { public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " "; + public static final String JOB_RUNNING_MAP_LIMIT = + "mapreduce.job.running.map.limit"; + public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0; + + public static final String JOB_RUNNING_REDUCE_LIMIT = + "mapreduce.job.running.reduce.limit"; + public static final int DEFAULT_JOB_RUNNING_REDUCE_LIMIT = 0; + /* config for tracking the local file where all the credentials for the job * credentials. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c9b55e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 6e80679..d864756 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -83,6 +83,22 @@ </property> <property> + <name>mapreduce.job.running.map.limit</name> + <value>0</value> + <description>The maximum number of simultaneous map tasks per job. + There is no limit if this value is 0 or negative. + </description> +</property> + +<property> + <name>mapreduce.job.running.reduce.limit</name> + <value>0</value> + <description>The maximum number of simultaneous reduce tasks per job. + There is no limit if this value is 0 or negative. + </description> +</property> + +<property> <name>mapreduce.job.reducer.preempt.delay.sec</name> <value>0</value> <description>The threshold in terms of seconds after which an unsatisfied mapper