Author: mayank
Date: Wed Jul 2 01:54:47 2014
New Revision: 1607227
URL: http://svn.apache.org/r1607227
Log:
YARN-2022 Preempting an Application Master container can be kept as least
priority when multiple applications are marked for preemption by
ProportionalCapacityPreemptionPolicy (Sunil G via mayank)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jul 2 01:54:47 2014
@@ -55,6 +55,10 @@ Release 2.5.0 - UNRELEASED
(Varun Vasudev via vinodkv)
IMPROVEMENTS
+
+ YARN-2022 Preempting an Application Master container can be kept as
least priority
+ when multiple applications are marked for preemption by
+ ProportionalCapacityPreemptionPolicy (Sunil G via mayank)
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
jeagles)
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
Wed Jul 2 01:54:47 2014
@@ -111,7 +111,7 @@ public class ProportionalCapacityPreempt
public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
- //the dispatcher to send preempt and kill events
+ // the dispatcher to send preempt and kill events
public EventHandler<ContainerPreemptEvent> dispatcher;
private final Clock clock;
@@ -437,8 +437,9 @@ public class ProportionalCapacityPreempt
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
List<TempQueue> queues, Resource clusterResource) {
- Map<ApplicationAttemptId,Set<RMContainer>> list =
+ Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+ List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
for (TempQueue qT : queues) {
// we act only if we are violating balance by more than
@@ -449,26 +450,83 @@ public class ProportionalCapacityPreempt
// accounts for natural termination of containers
Resource resToObtain =
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+ Resource skippedAMSize = Resource.newInstance(0, 0);
// lock the leafqueue while we scan applications and unreserve
- synchronized(qT.leafQueue) {
- NavigableSet<FiCaSchedulerApp> ns =
- (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
+ synchronized (qT.leafQueue) {
+ NavigableSet<FiCaSchedulerApp> ns =
+ (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
qT.actuallyPreempted = Resources.clone(resToObtain);
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
- if (Resources.lessThanOrEqual(rc, clusterResource,
- resToObtain, Resources.none())) {
+ if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+ Resources.none())) {
break;
}
- list.put(fc.getApplicationAttemptId(),
- preemptFrom(fc, clusterResource, resToObtain));
+ preemptMap.put(
+ fc.getApplicationAttemptId(),
+ preemptFrom(fc, clusterResource, resToObtain,
+ skippedAMContainerlist, skippedAMSize));
}
+ Resource maxAMCapacityForThisQueue = Resources.multiply(
+ Resources.multiply(clusterResource,
+ qT.leafQueue.getAbsoluteCapacity()),
+ qT.leafQueue.getMaxAMResourcePerQueuePercent());
+
+ // Can try preempting AMContainers (still saving atmost
+ // maxAMCapacityForThisQueue AMResource's) if more resources are
+ // required to be preempted from this Queue.
+ preemptAMContainers(clusterResource, preemptMap,
+ skippedAMContainerlist, resToObtain, skippedAMSize,
+ maxAMCapacityForThisQueue);
}
}
}
- return list;
+ return preemptMap;
+ }
+
+ /**
+ * As more resources are needed for preemption, saved AMContainers has to be
+ * rescanned. Such AMContainers can be preempted based on resToObtain, but
+ * maxAMCapacityForThisQueue resources will be still retained.
+ *
+ * @param clusterResource
+ * @param preemptMap
+ * @param skippedAMContainerlist
+ * @param resToObtain
+ * @param skippedAMSize
+ * @param maxAMCapacityForThisQueue
+ */
+ private void preemptAMContainers(Resource clusterResource,
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ List<RMContainer> skippedAMContainerlist, Resource resToObtain,
+ Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
+ for (RMContainer c : skippedAMContainerlist) {
+ // Got required amount of resources for preemption, can stop now
+ if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+ Resources.none())) {
+ break;
+ }
+ // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
+ // container selection iteration for preemption will be stopped.
+ if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
+ maxAMCapacityForThisQueue)) {
+ break;
+ }
+ Set<RMContainer> contToPrempt = preemptMap.get(c
+ .getApplicationAttemptId());
+ if (null == contToPrempt) {
+ contToPrempt = new HashSet<RMContainer>();
+ preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
+ }
+ contToPrempt.add(c);
+
+ Resources.subtractFrom(resToObtain, c.getContainer().getResource());
+ Resources.subtractFrom(skippedAMSize, c.getContainer()
+ .getResource());
+ }
+ skippedAMContainerlist.clear();
}
/**
@@ -480,8 +538,9 @@ public class ProportionalCapacityPreempt
* @param rsrcPreempt
* @return
*/
- private Set<RMContainer> preemptFrom(
- FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
+ private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
+ Resource clusterResource, Resource rsrcPreempt,
+ List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
Set<RMContainer> ret = new HashSet<RMContainer>();
ApplicationAttemptId appId = app.getApplicationAttemptId();
@@ -513,6 +572,12 @@ public class ProportionalCapacityPreempt
rsrcPreempt, Resources.none())) {
return ret;
}
+ // Skip AM Container from preemption for now.
+ if (c.isAMContainer()) {
+ skippedAMContainerlist.add(c);
+ Resources.addTo(skippedAMSize, c.getContainer().getResource());
+ continue;
+ }
ret.add(c);
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Wed Jul 2 01:54:47 2014
@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -832,7 +833,10 @@ public class RMAppAttemptImpl implements
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
- .get(0));
+ .get(0));
+ RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
+ .getRMContainer(appAttempt.getMasterContainer().getId());
+ rmMasterContainer.setAMContainer(true);
// The node set in NMTokenSecrentManager is used for marking whether the
// NMToken has been issued for this node to the AM.
// When AM container was allocated to RM itself, the node which allocates
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
Wed Jul 2 01:54:47 2014
@@ -71,5 +71,7 @@ public interface RMContainer extends Eve
ContainerState getContainerState();
ContainerReport createContainerReport();
+
+ boolean isAMContainer();
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
Wed Jul 2 01:54:47 2014
@@ -155,6 +155,7 @@ public class RMContainerImpl implements
private long creationTime;
private long finishTime;
private ContainerStatus finishedStatus;
+ private boolean isAMContainer;
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -176,6 +177,7 @@ public class RMContainerImpl implements
this.rmContext = rmContext;
this.eventHandler = rmContext.getDispatcher().getEventHandler();
this.containerAllocationExpirer =
rmContext.getContainerAllocationExpirer();
+ this.isAMContainer = false;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@@ -314,6 +316,25 @@ public class RMContainerImpl implements
}
@Override
+ public boolean isAMContainer() {
+ try {
+ readLock.lock();
+ return isAMContainer;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setAMContainer(boolean isAMContainer) {
+ try {
+ writeLock.lock();
+ this.isAMContainer = isAMContainer;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
public void handle(RMContainerEvent event) {
LOG.debug("Processing " + event.getContainerId() + " of type " +
event.getType());
try {
@@ -490,5 +511,4 @@ public class RMContainerImpl implements
}
return containerReport;
}
-
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
Wed Jul 2 01:54:47 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
@@ -242,6 +243,20 @@ public abstract class AbstractYarnSchedu
// recover scheduler attempt
schedulerAttempt.recoverContainer(rmContainer);
+
+ // set master container for the current running AMContainer for this
+ // attempt.
+ RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
+ if (appAttempt != null) {
+ Container masterContainer = appAttempt.getMasterContainer();
+
+ // Mark current running AMContainer's RMContainer based on the master
+ // container ID stored in AppAttempt.
+ if (masterContainer != null
+ && masterContainer.getId().equals(rmContainer.getContainerId())) {
+ ((RMContainerImpl)rmContainer).setAMContainer(true);
+ }
+ }
}
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
Wed Jul 2 01:54:47 2014
@@ -62,6 +62,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -564,6 +565,43 @@ public class TestWorkPreservingRMRestart
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
}
+
+ @Test (timeout = 30000)
+ public void testAMContainerStatusWithRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1_1 = rm1.submitApp(1024);
+ MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
+
+ RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt();
+ AbstractYarnScheduler scheduler =
+ ((AbstractYarnScheduler) rm1.getResourceScheduler());
+
+ Assert.assertTrue(scheduler.getRMContainer(
+ attempt0.getMasterContainer().getId()).isAMContainer());
+
+ // Re-start RM
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ List<NMContainerStatus> am1_1Containers =
+ createNMContainerStatusForApp(am1_1);
+ nm1.registerNode(am1_1Containers, null);
+
+ // Wait for RM to settle down on recovering containers;
+ waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
+
+ scheduler = ((AbstractYarnScheduler) rm2.getResourceScheduler());
+ Assert.assertTrue(scheduler.getRMContainer(
+ attempt0.getMasterContainer().getId()).isAMContainer());
+ }
+
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
Wed Jul 2 01:54:47 2014
@@ -80,6 +80,8 @@ public class TestProportionalCapacityPre
static final long TS = 3141592653L;
int appAlloc = 0;
+ boolean setAMContainer = false;
+ float setAMResourcePercent = 0.0f;
Random rand = null;
Clock mClock = null;
Configuration conf = null;
@@ -466,7 +468,108 @@ public class TestProportionalCapacityPre
fail("Failed to find SchedulingMonitor service, please check what
happened");
}
+
+ @Test
+ public void testSkipAMContainer() {
+ int[][] qData = new int[][] {
+ // / A B
+ { 100, 50, 50 }, // abs
+ { 100, 100, 100 }, // maxcap
+ { 100, 100, 0 }, // used
+ { 70, 20, 50 }, // pending
+ { 0, 0, 0 }, // reserved
+ { 5, 4, 1 }, // apps
+ { -1, 1, 1 }, // req granularity
+ { 2, 0, 0 }, // subqueues
+ };
+ setAMContainer = true;
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+
+ // By skipping AM Container, all other 24 containers of appD will be
+ // preempted
+ verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+ // By skipping AM Container, all other 24 containers of appC will be
+ // preempted
+ verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+ // Since AM containers of appC and appD are saved, 2 containers from appB
+ // has to be preempted.
+ verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+ setAMContainer = false;
+ }
+
+ @Test
+ public void testPreemptSkippedAMContainers() {
+ int[][] qData = new int[][] {
+ // / A B
+ { 100, 10, 90 }, // abs
+ { 100, 100, 100 }, // maxcap
+ { 100, 100, 0 }, // used
+ { 70, 20, 90 }, // pending
+ { 0, 0, 0 }, // reserved
+ { 5, 4, 1 }, // apps
+ { -1, 5, 5 }, // req granularity
+ { 2, 0, 0 }, // subqueues
+ };
+ setAMContainer = true;
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+
+ // All 5 containers of appD will be preempted including AM container.
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
+ // All 5 containers of appC will be preempted including AM container.
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+ // By skipping AM Container, all other 4 containers of appB will be
+ // preempted
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+ // By skipping AM Container, all other 4 containers of appA will be
+ // preempted
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ setAMContainer = false;
+ }
+
+ @Test
+ public void testAMResourcePercentForSkippedAMContainers() {
+ int[][] qData = new int[][] {
+ // / A B
+ { 100, 10, 90 }, // abs
+ { 100, 100, 100 }, // maxcap
+ { 100, 100, 0 }, // used
+ { 70, 20, 90 }, // pending
+ { 0, 0, 0 }, // reserved
+ { 5, 4, 1 }, // apps
+ { -1, 5, 5 }, // req granularity
+ { 2, 0, 0 }, // subqueues
+ };
+ setAMContainer = true;
+ setAMResourcePercent = 0.5f;
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+
+ // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb.
+ // Total used AM container size is 20GB, hence 2 AM container has
+ // to be preempted as Queue Capacity is 10Gb.
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+ // Including AM Container, all other 4 containers of appC will be
+ // preempted
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+ // By skipping AM Container, all other 4 containers of appB will be
+ // preempted
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+ // By skipping AM Container, all other 4 containers of appA will be
+ // preempted
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ setAMContainer = false;
+ }
+
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
@@ -583,6 +686,9 @@ public class TestProportionalCapacityPre
}
}
when(lq.getApplications()).thenReturn(qApps);
+ if(setAMResourcePercent != 0.0f){
+
when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
+ }
p.getChildQueues().add(lq);
return lq;
}
@@ -607,7 +713,11 @@ public class TestProportionalCapacityPre
List<RMContainer> cLive = new ArrayList<RMContainer>();
for (int i = 0; i < used; i += gran) {
- cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+ if(setAMContainer && i == 0){
+ cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
+ }else{
+ cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+ }
++cAlloc;
}
when(app.getLiveContainers()).thenReturn(cLive);
@@ -623,6 +733,10 @@ public class TestProportionalCapacityPre
RMContainer mC = mock(RMContainer.class);
when(mC.getContainerId()).thenReturn(cId);
when(mC.getContainer()).thenReturn(c);
+ when(mC.getApplicationAttemptId()).thenReturn(appAttId);
+ if(0 == priority){
+ when(mC.isAMContainer()).thenReturn(true);
+ }
return mC;
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1607227&r1=1607226&r2=1607227&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
Wed Jul 2 01:54:47 2014
@@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -600,6 +602,9 @@ public class TestRMAppAttemptTransitions
any(List.class),
any(List.class))).
thenReturn(allocation);
+ RMContainer rmContainer = mock(RMContainerImpl.class);
+ when(scheduler.getRMContainer(container.getId())).
+ thenReturn(rmContainer);
applicationAttempt.handle(
new RMAppAttemptContainerAllocatedEvent(