YARN-3887. Support changing Application priority during runtime. Contributed by Sunil G
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa1d84ae Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa1d84ae Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa1d84ae Branch: refs/heads/YARN-1197 Commit: fa1d84ae2739a1e76f58b9c96d1378f9453cc0d2 Parents: b56daff Author: Jian He <jia...@apache.org> Authored: Mon Aug 10 20:51:54 2015 -0700 Committer: Jian He <jia...@apache.org> Committed: Mon Aug 10 20:51:54 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/recovery/RMStateStore.java | 5 + .../scheduler/AbstractYarnScheduler.java | 7 + .../scheduler/SchedulerApplicationAttempt.java | 2 +- .../scheduler/YarnScheduler.java | 11 + .../scheduler/capacity/CapacityScheduler.java | 49 ++++ .../AbstractComparatorOrderingPolicy.java | 6 + .../capacity/TestApplicationPriority.java | 260 +++++++++++++++++++ 8 files changed, 342 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5e27a2f..ada1056 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -164,6 +164,9 @@ Release 2.8.0 - UNRELEASED YARN-3873. PendingApplications in LeafQueue should also use OrderingPolicy. (Sunil G via wangda) + YARN-3887. Support changing Application priority during runtime. (Sunil G + via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5036450..affbee1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -706,6 +706,11 @@ public abstract class RMStateStore extends AbstractService { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } + public void updateApplicationStateSynchronously( + ApplicationStateData appState) { + handleStoreEvent(new RMStateUpdateAppEvent(appState)); + } + public void updateFencedState() { handleStoreEvent(new RMStateStoreEvent(RMStateStoreEventType.FENCED)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d69600a..ed05189 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -701,4 +701,11 @@ public abstract class AbstractYarnScheduler // specific scheduler. return Priority.newInstance(0); } + + @Override + public void updateApplicationPriority(Priority newPriority, + ApplicationId applicationId) throws YarnException { + // Dummy Implementation till Application Priority changes are done in + // specific scheduler. + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 317e61c..4872543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -98,7 +98,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private boolean amRunning = false; private LogAggregationContext logAggregationContext; - private Priority appPriority = null; + private volatile Priority appPriority = null; protected ResourceUsage attemptResourceUsage = new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index f629579..0fa23e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -306,4 +306,15 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) throws YarnException; + + /** + * + * Change application priority of a submitted application at runtime + * + * @param newPriority Submitted Application priority. + * + * @param applicationId Application ID + */ + public void updateApplicationPriority(Priority newPriority, + ApplicationId applicationId) throws YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index b4d0095..b4b1383 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -1850,4 +1851,52 @@ public class CapacityScheduler extends public Priority getMaxClusterLevelAppPriority() { return maxClusterLevelAppPriority; } + + @Override + public synchronized void updateApplicationPriority(Priority newPriority, + ApplicationId applicationId) throws YarnException { + Priority appPriority = null; + SchedulerApplication<FiCaSchedulerApp> application = applications + .get(applicationId); + + if (application == null) { + throw new YarnException("Application '" + applicationId + + "' is not present, hence could not change priority."); + } + + if (application.getPriority().equals(newPriority)) { + return; + } + + RMApp rmApp = rmContext.getRMApps().get(applicationId); + appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), + rmApp.getQueue(), applicationId); + + // Update new priority in Submission Context to keep track in HA + rmApp.getApplicationSubmissionContext().setPriority(appPriority); + + // Update to state store + ApplicationStateData appState = ApplicationStateData.newInstance( + rmApp.getSubmitTime(), rmApp.getStartTime(), + rmApp.getApplicationSubmissionContext(), rmApp.getUser()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState); + + // As we use iterator over a TreeSet for OrderingPolicy, once we change + // priority then reinsert back to make order correct. + LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); + synchronized (queue) { + queue.getOrderingPolicy().removeSchedulableEntity( + application.getCurrentAppAttempt()); + + // Update new priority in SchedulerApplication + application.setPriority(appPriority); + + queue.getOrderingPolicy().addSchedulableEntity( + application.getCurrentAppAttempt()); + } + + LOG.info("Priority '" + appPriority + "' is updated in queue :" + + rmApp.getQueue() + "for application:" + applicationId + + "for the user: " + rmApp.getUser()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index c4d2aae..7bec03a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -94,11 +94,17 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti @Override public void addSchedulableEntity(S s) { + if (null == s) { + return; + } schedulableEntities.add(s); } @Override public boolean removeSchedulableEntity(S s) { + if (null == s) { + return false; + } synchronized (entitiesToReorder) { entitiesToReorder.remove(s.getId()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index db094e3..169e9f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -36,6 +38,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; @@ -307,4 +312,259 @@ public class TestApplicationPriority { maxPriority); rm.stop(); } + + @Test + public void testUpdatePriorityAtRuntime() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority1 = Priority.newInstance(5); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() + .get(app1.getApplicationId()).getCurrentAppAttempt(); + + // Verify whether the new priority is updated + Assert.assertEquals(appPriority2, schedulerAppAttempt.getPriority()); + } + + @Test + public void testUpdateInvalidPriorityAtRuntime() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority1 = Priority.newInstance(5); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Change the priority of App1 to 15 + Priority appPriority2 = Priority.newInstance(15); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() + .get(app1.getApplicationId()).getCurrentAppAttempt(); + + // Verify whether priority 15 is reset to 10 + Priority appPriority3 = Priority.newInstance(10); + Assert.assertEquals(appPriority3, schedulerAppAttempt.getPriority()); + rm.stop(); + } + + @Test(timeout = 180000) + public void testRMRestartWithChangeInPriority() throws Exception { + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + false); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map<ApplicationId, ApplicationStateData> rmAppState = rmState + .getApplicationState(); + + // PHASE 1: create state in an RM + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, + rm1.getResourceTrackerService()); + nm1.registerNode(); + + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm1.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm1, nm1); + am1.registerAppAttempt(); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + + // let things settle down + Thread.sleep(1000); + + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore); + + // start new RM + rm2.start(); + // change NM to point to new RM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // Verify RM Apps after this restart + Assert.assertEquals(1, rm2.getRMContext().getRMApps().size()); + + // get scheduler app + RMApp loadedApp = rm2.getRMContext().getRMApps() + .get(app1.getApplicationId()); + + // Verify whether priority 15 is reset to 10 + Assert.assertEquals(appPriority2, loadedApp.getCurrentAppAttempt() + .getSubmissionContext().getPriority()); + + rm2.stop(); + rm1.stop(); + } + + @Test + public void testApplicationPriorityAllocationWithChangeInPriority() + throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority1 = Priority.newInstance(5); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + + // add request for containers and wait for containers to be allocated. + int NUM_CONTAINERS = 7; + List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1", + NUM_CONTAINERS, 2 * GB, nm1); + + Assert.assertEquals(7, allocated1.size()); + Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory()); + + // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available + SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( + nm1.getNodeId()); + Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemory()); + + // Submit the second app App2 with priority 8 (Higher than App1) + Priority appPriority2 = Priority.newInstance(8); + RMApp app2 = rm.submitApp(1 * GB, appPriority2); + + // kick the scheduler, 1 GB which was free is given to AM of App2 + nm1.nodeHeartbeat(true); + MockAM am2 = MockRM.launchAM(app2, rm, nm1); + am2.registerAppAttempt(); + + // check node report, 16 GB used and 0 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttemptApp1 = cs.getSchedulerApplications() + .get(app1.getApplicationId()).getCurrentAppAttempt(); + // kill 2 containers to free up some space + int counter = 0; + for (Iterator<Container> iterator = allocated1.iterator(); iterator + .hasNext();) { + Container c = iterator.next(); + if (++counter > 2) { + break; + } + cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + iterator.remove(); + } + + // check node report, 12 GB used and 4 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory()); + + // add request for containers App1 + am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>()); + + // add request for containers App2 and wait for containers to get allocated + List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1", + 2, 2 * GB, nm1); + + Assert.assertEquals(2, allocated2.size()); + // check node report, 16 GB used and 0 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + + // kill 1 more + counter = 0; + for (Iterator<Container> iterator = allocated1.iterator(); iterator + .hasNext();) { + Container c = iterator.next(); + if (++counter > 1) { + break; + } + cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + iterator.remove(); + } + + // check node report, 14 GB used and 2 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(14 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory()); + + // Change the priority of App1 to 3 (lowest) + Priority appPriority3 = Priority.newInstance(3); + cs.updateApplicationPriority(appPriority3, app2.getApplicationId()); + + // add request for containers App2 + am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>()); + + // add request for containers App1 and wait for containers to get allocated + // since priority is more for App1 now, App1 will get a container. + List<Container> allocated3 = am1.allocateAndWaitForContainers("127.0.0.1", + 1, 2 * GB, nm1); + + Assert.assertEquals(1, allocated3.size()); + // Now App1 will have 5 containers and 1 AM. App2 will have 2 containers. + Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size()); + rm.stop(); + } }