Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Jan 2 20:21:03 2014 @@ -78,7 +78,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -255,7 +257,12 @@ public class TestFairScheduler { private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, String queueId, String userId, int numContainers, int priority) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplicationAttempt(id, queueId, userId); + scheduler.addApplication(id.getApplicationId(), queueId, userId); + // This conditional is for testAclSubmitApplication where app is rejected + // and no app is added. + if (scheduler.applications.containsKey(id.getApplicationId())) { + scheduler.addApplicationAttempt(id); + } List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, priority, numContainers, true); @@ -583,7 +590,7 @@ public class TestFairScheduler { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). getResourceUsage().getMemory()); - assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); + assertEquals(1024, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity RMNode node2 = @@ -599,10 +606,10 @@ public class TestFairScheduler { getResourceUsage().getMemory()); // The old reservation should still be there... - assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); + assertEquals(1024, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory()); // ... but it should disappear when we update the first node. scheduler.handle(updateEvent); - assertEquals(0, scheduler.applications.get(attId).getCurrentReservation().getMemory()); + assertEquals(0, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory()); } @@ -618,9 +625,13 @@ public class TestFairScheduler { null, null, null, false, false, 0, null, null), null, null, 0, null); appsMap.put(appAttemptId.getApplicationId(), rmApp); - AppAttemptAddedSchedulerEvent appAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user1"); + AppAddedSchedulerEvent appAddedEvent = + new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", + "user1"); scheduler.handle(appAddedEvent); + AppAttemptAddedSchedulerEvent attempAddedEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId); + scheduler.handle(attempAddedEvent); assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true) @@ -639,10 +650,14 @@ public class TestFairScheduler { null, null, null, ApplicationSubmissionContext.newInstance(null, null, null, null, null, false, false, 0, null, null), null, null, 0, null); appsMap.put(appAttemptId.getApplicationId(), rmApp); - - AppAttemptAddedSchedulerEvent appAddedEvent2 = - new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user2"); - scheduler.handle(appAddedEvent2); + + AppAddedSchedulerEvent appAddedEvent = + new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", + "user2"); + scheduler.handle(appAddedEvent); + AppAttemptAddedSchedulerEvent attempAddedEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId); + scheduler.handle(attempAddedEvent); assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true) @@ -660,8 +675,8 @@ public class TestFairScheduler { // submit app with empty queue ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); - AppAttemptAddedSchedulerEvent appAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, "", "user1"); + AppAddedSchedulerEvent appAddedEvent = + new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "", "user1"); scheduler.handle(appAddedEvent); // submission rejected @@ -695,7 +710,7 @@ public class TestFairScheduler { scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId appId; - Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications; + Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.appAttempts; List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>(); rules.add(new QueuePlacementRule.Specified().initialize(true, null)); @@ -786,11 +801,14 @@ public class TestFairScheduler { scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplicationAttempt(id11, "root.queue1", "user1"); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); + scheduler.addApplicationAttempt(id11); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplicationAttempt(id21, "root.queue2", "user1"); + scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); + scheduler.addApplicationAttempt(id21); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplicationAttempt(id22, "root.queue2", "user1"); + scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); + scheduler.addApplicationAttempt(id22); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -831,11 +849,13 @@ public class TestFairScheduler { @Test public void testAppAdditionAndRemoval() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); - - AppAttemptAddedSchedulerEvent appAddedEvent1 = - new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), "default", - "user1"); - scheduler.handle(appAddedEvent1); + ApplicationAttemptId attemptId =createAppAttemptId(1, 1); + AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default", + "user1"); + scheduler.handle(appAddedEvent); + AppAttemptAddedSchedulerEvent attemptAddedEvent = + new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1)); + scheduler.handle(attemptAddedEvent); // Scheduler should have two queues (the default and the one created for user1) assertEquals(2, scheduler.getQueueManager().getLeafQueues().size()); @@ -1118,12 +1138,12 @@ public class TestFairScheduler { scheduler.handle(nodeUpdate3); } - assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app3).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app6).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app2).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app5).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app6).getLiveContainers().size()); // Now new requests arrive from queues C and D ApplicationAttemptId app7 = @@ -1146,16 +1166,16 @@ public class TestFairScheduler { // Make sure it is lowest priority container. scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app2).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app5).getLiveContainers().size()); // First verify we are adding containers to preemption list for the application - assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(), - scheduler.applications.get(app3).getPreemptionContainers())); - assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(), - scheduler.applications.get(app6).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.appAttempts.get(app3).getLiveContainers(), + scheduler.appAttempts.get(app3).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.appAttempts.get(app6).getLiveContainers(), + scheduler.appAttempts.get(app6).getPreemptionContainers())); // Pretend 15 seconds have passed clock.tick(15); @@ -1165,8 +1185,8 @@ public class TestFairScheduler { Resources.createResource(2 * 1024)); // At this point the containers should have been killed (since we are not simulating AM) - assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size()); // Trigger a kill by insisting we want containers back scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), @@ -1180,22 +1200,22 @@ public class TestFairScheduler { scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app2).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app5).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); - assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app2).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app5).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1354,9 +1374,9 @@ public class TestFairScheduler { // One container should get reservation and the other should get nothing assertEquals(1024, - scheduler.applications.get(attId1).getCurrentReservation().getMemory()); + scheduler.appAttempts.get(attId1).getCurrentReservation().getMemory()); assertEquals(0, - scheduler.applications.get(attId2).getCurrentReservation().getMemory()); + scheduler.appAttempts.get(attId2).getCurrentReservation().getMemory()); } @Test (timeout = 5000) @@ -1391,7 +1411,7 @@ public class TestFairScheduler { scheduler.handle(updateEvent); // App 1 should be running - assertEquals(1, scheduler.applications.get(attId1).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(attId1).getLiveContainers().size()); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 1); @@ -1400,7 +1420,7 @@ public class TestFairScheduler { scheduler.handle(updateEvent); // App 2 should not be running - assertEquals(0, scheduler.applications.get(attId2).getLiveContainers().size()); + assertEquals(0, scheduler.appAttempts.get(attId2).getLiveContainers().size()); // Request another container for app 1 createSchedulingRequestExistingApplication(1024, 1, attId1); @@ -1409,7 +1429,7 @@ public class TestFairScheduler { scheduler.handle(updateEvent); // Request should be fulfilled - assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size()); + assertEquals(2, scheduler.appAttempts.get(attId1).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1429,10 +1449,10 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.applications.get(attId); + FSSchedulerApp app = scheduler.appAttempts.get(attId); assertEquals(1, app.getLiveContainers().size()); - ContainerId containerId = scheduler.applications.get(attId) + ContainerId containerId = scheduler.appAttempts.get(attId) .getLiveContainers().iterator().next().getContainerId(); // Cause reservation to be created @@ -1501,9 +1521,9 @@ public class TestFairScheduler { ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.applications.get(attId1); + FSSchedulerApp app1 = scheduler.appAttempts.get(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.applications.get(attId2); + FSSchedulerApp app2 = scheduler.appAttempts.get(attId2); assertNull("The application was allowed", app2); } @@ -1526,7 +1546,8 @@ public class TestFairScheduler { scheduler.handle(nodeEvent2); ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplicationAttempt(appId, "queue1", "user1"); + scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); + scheduler.addApplicationAttempt(appId); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -1545,14 +1566,14 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent1); // should assign node local - assertEquals(1, scheduler.applications.get(appId).getLiveContainers().size()); + assertEquals(1, scheduler.appAttempts.get(appId).getLiveContainers().size()); // node 2 checks in scheduler.update(); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); // should assign rack local - assertEquals(2, scheduler.applications.get(appId).getLiveContainers().size()); + assertEquals(2, scheduler.appAttempts.get(appId).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1571,8 +1592,8 @@ public class TestFairScheduler { "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.applications.get(attId1); - FSSchedulerApp app2 = scheduler.applications.get(attId2); + FSSchedulerApp app1 = scheduler.appAttempts.get(attId1); + FSSchedulerApp app2 = scheduler.appAttempts.get(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1612,7 +1633,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.applications.get(attId); + FSSchedulerApp app = scheduler.appAttempts.get(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1674,10 +1695,10 @@ public class TestFairScheduler { ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.applications.get(attId1); - FSSchedulerApp app2 = scheduler.applications.get(attId2); - FSSchedulerApp app3 = scheduler.applications.get(attId3); - FSSchedulerApp app4 = scheduler.applications.get(attId4); + FSSchedulerApp app1 = scheduler.appAttempts.get(attId1); + FSSchedulerApp app2 = scheduler.appAttempts.get(attId2); + FSSchedulerApp app3 = scheduler.appAttempts.get(attId3); + FSSchedulerApp app4 = scheduler.appAttempts.get(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1764,7 +1785,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); - scheduler.addApplicationAttempt(attId, queue, user); + scheduler.addApplication(attId.getApplicationId(), queue, user); numTries = 0; while (application.getFinishTime() == 0 && numTries < MAX_TRIES) { @@ -1792,7 +1813,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.applications.get(attId); + FSSchedulerApp app = scheduler.appAttempts.get(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -1861,7 +1882,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.applications.get(attId1); + FSSchedulerApp app = scheduler.appAttempts.get(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1900,7 +1921,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.applications.get(attId1); + FSSchedulerApp app = scheduler.appAttempts.get(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1933,7 +1954,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.applications.get(attId); + FSSchedulerApp app = scheduler.appAttempts.get(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -1973,7 +1994,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.applications.get(attId); + FSSchedulerApp app = scheduler.appAttempts.get(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -1993,10 +2014,10 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.applications.get(appAttId1); + FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.applications.get(appAttId2); + FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2034,13 +2055,13 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.applications.get(appAttId1); + FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.applications.get(appAttId2); + FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.applications.get(appAttId3); + FSSchedulerApp app3 = scheduler.appAttempts.get(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2071,19 +2092,19 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.applications.get(appAttId1); + FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.applications.get(appAttId2); + FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.applications.get(appAttId3); + FSSchedulerApp app3 = scheduler.appAttempts.get(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.applications.get(appAttId4); + FSSchedulerApp app4 = scheduler.appAttempts.get(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2163,7 +2184,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.applications.get(attId1); + FSSchedulerApp app = scheduler.appAttempts.get(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2178,12 +2199,12 @@ public class TestFairScheduler { public void testConcurrentAccessOnApplications() throws Exception { FairScheduler fs = new FairScheduler(); TestCapacityScheduler.verifyConcurrentAccessOnApplications( - fs.applications, FSSchedulerApp.class, FSLeafQueue.class); + fs.appAttempts, FSSchedulerApp.class, FSLeafQueue.class); } private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.applications.get(attId); + FSSchedulerApp app = scheduler.appAttempts.get(attId); FSLeafQueue queue = app.getQueue(); Collection<AppSchedulable> runnableApps = queue.getRunnableAppSchedulables(); @@ -2356,7 +2377,8 @@ public class TestFairScheduler { // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplicationAttempt(appAttemptId, "queue11", "user11"); + fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); + fs.addApplicationAttempt(appAttemptId); List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); @@ -2367,7 +2389,7 @@ public class TestFairScheduler { // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.applications.get(appAttemptId); + FSSchedulerApp app = fs.appAttempts.get(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -2455,7 +2477,7 @@ public class TestFairScheduler { ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.applications.get(appAttemptId); + FSSchedulerApp app = scheduler.appAttempts.get(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), @@ -2465,7 +2487,7 @@ public class TestFairScheduler { scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), null, Collections.singletonList(host)); - assertFalse(scheduler.applications.get(appAttemptId).isBlacklisted(host)); + assertFalse(scheduler.appAttempts.get(appAttemptId).isBlacklisted(host)); List<ResourceRequest> update = Arrays.asList( createResourceRequest(GB, node.getHostName(), 1, 0, true)); @@ -2527,4 +2549,12 @@ public class TestFairScheduler { assertTrue(appAttIds.contains(appAttId1)); assertTrue(appAttIds.contains(appAttId2)); } + + @Test + public void testAddAndRemoveAppFromFairScheduler() throws Exception { + FairScheduler scheduler = + (FairScheduler) resourceManager.getResourceScheduler(); + TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( + scheduler.applications, scheduler, "default"); + } }
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Thu Jan 2 20:21:03 2014 @@ -61,13 +61,16 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -150,14 +153,21 @@ public class TestFifoScheduler { ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); - SchedulerEvent event = - new AppAttemptAddedSchedulerEvent(appAttemptId, "queue", "user"); - schedular.handle(event); + SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user"); + schedular.handle(appEvent); + SchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId); + schedular.handle(attemptEvent); appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2); - event = new AppAttemptAddedSchedulerEvent(appAttemptId, "queue", "user"); - schedular.handle(event); + SchedulerEvent appEvent2 = + new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue", + "user"); + schedular.handle(appEvent2); + SchedulerEvent attemptEvent2 = + new AppAttemptAddedSchedulerEvent(appAttemptId); + schedular.handle(attemptEvent2); int afterAppsSubmitted = metrics.getAppsSubmitted(); Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted); @@ -188,9 +198,13 @@ public class TestFifoScheduler { int _appAttemptId = 1; ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, _appAttemptId); - AppAttemptAddedSchedulerEvent appEvent1 = - new AppAttemptAddedSchedulerEvent(appAttemptId, "queue1", "user1"); - scheduler.handle(appEvent1); + AppAddedSchedulerEvent appEvent = + new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1", + "user1"); + scheduler.handle(appEvent); + AppAttemptAddedSchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId); + scheduler.handle(attemptEvent); int memory = 64; int nConts = 3; @@ -274,9 +288,13 @@ public class TestFifoScheduler { int _appAttemptId = 1; ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, _appAttemptId); - AppAttemptAddedSchedulerEvent appEvent1 = - new AppAttemptAddedSchedulerEvent(appAttemptId, "queue1", "user1"); - scheduler.handle(appEvent1); + AppAddedSchedulerEvent appEvent = + new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1", + "user1"); + scheduler.handle(appEvent); + AppAttemptAddedSchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId); + scheduler.handle(attemptEvent); int memory = 1024; int priority = 1; @@ -520,7 +538,7 @@ public class TestFifoScheduler { public void testConcurrentAccessOnApplications() throws Exception { FifoScheduler fs = new FifoScheduler(); TestCapacityScheduler.verifyConcurrentAccessOnApplications( - fs.applications, FiCaSchedulerApp.class, Queue.class); + fs.appAttempts, FiCaSchedulerApp.class, Queue.class); } @SuppressWarnings("resource") @@ -541,9 +559,13 @@ public class TestFifoScheduler { ApplicationId appId = BuilderUtils.newApplicationId(100, 1); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); - SchedulerEvent event = - new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user"); - fs.handle(event); + SchedulerEvent appEvent = + new AppAddedSchedulerEvent(appId, "default", + "user"); + fs.handle(appEvent); + SchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId); + fs.handle(attemptEvent); // Verify the blacklist can be updated independent of requesting containers fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), @@ -575,6 +597,17 @@ public class TestFifoScheduler { Assert.assertNull(scheduler.getAppsInQueue("someotherqueue")); } + @Test + public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); + TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(fs.applications, + fs, "queue"); + } + private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemory()); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java Thu Jan 2 20:21:03 2014 @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; @@ -1392,6 +1393,8 @@ public class TestRMWebServicesApps exten MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048); RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1"); amNodeManager.nodeHeartbeat(true); + rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.ALLOCATED); int maxAppAttempts = rm.getConfig().getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -1405,6 +1408,8 @@ public class TestRMWebServicesApps exten rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); amNodeManager.nodeHeartbeat(true); } + rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.ALLOCATED); assertEquals("incorrect number of attempts", maxAppAttempts, app1.getAppAttempts().values().size()); testAppAttemptsHelper(app1.getApplicationId().toString(), app1,