YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d138804e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d138804e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d138804e Branch: refs/heads/HDFS-7285 Commit: d138804e49735995653a37efa19589f9cdf13879 Parents: 521a196 Author: Karthik Kambatla <ka...@apache.org> Authored: Wed Mar 4 18:06:36 2015 -0800 Committer: Jing Zhao <ji...@apache.org> Committed: Mon Mar 9 13:11:24 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FairScheduler.java | 1 + .../scheduler/fair/MaxRunningAppsEnforcer.java | 40 ++- .../scheduler/fair/TestFairScheduler.java | 310 ++++++++++++++++++- 4 files changed, 348 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d138804e/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0b71bee..9a52325 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -697,6 +697,9 @@ Release 2.7.0 - UNRELEASED YARN-3131. YarnClientImpl should check FAILED and KILLED state in submitApplication (Chang Li via jlowe) + + YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending + jobs. (Siqi Li via kasha) Release 2.6.0 - 2014-11-18 http://git-wip-us.apache.org/repos/asf/hadoop/blob/d138804e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2b59716..e8a9555 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1477,6 +1477,7 @@ public class FairScheduler extends allocConf = queueInfo; allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); queueMgr.updateAllocationConfiguration(allocConf); + maxRunningEnforcer.updateRunnabilityOnReload(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d138804e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index 2c90edd..f750438 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -105,6 +105,26 @@ public class MaxRunningAppsEnforcer { } /** + * This is called after reloading the allocation configuration when the + * scheduler is reinitilized + * + * Checks to see whether any non-runnable applications become runnable + * now that the max running apps of given queue has been changed + * + * Runs in O(n) where n is the number of apps that are non-runnable and in + * the queues that went from having no slack to having slack. + */ + public void updateRunnabilityOnReload() { + FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue(); + List<List<FSAppAttempt>> appsNowMaybeRunnable = + new ArrayList<List<FSAppAttempt>>(); + + gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable); + + updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE); + } + + /** * Checks to see whether any other applications runnable now that the given * application has been removed from the given queue. And makes them so. * @@ -156,6 +176,19 @@ public class MaxRunningAppsEnforcer { } } + updateAppsRunnability(appsNowMaybeRunnable, + appsNowMaybeRunnable.size()); + } + + /** + * Checks to see whether applications are runnable now by iterating + * through each one of them and check if the queue and user have slack + * + * if we know how many apps can be runnable, there is no need to iterate + * through all apps, maxRunnableApps is used to break out of the iteration + */ + private void updateAppsRunnability(List<List<FSAppAttempt>> + appsNowMaybeRunnable, int maxRunnableApps) { // Scan through and check whether this means that any apps are now runnable Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); @@ -173,9 +206,7 @@ public class MaxRunningAppsEnforcer { next.getQueue().addApp(appSched, true); noLongerPendingApps.add(appSched); - // No more than one app per list will be able to be made runnable, so - // we can stop looking after we've found that many - if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) { + if (noLongerPendingApps.size() >= maxRunnableApps) { break; } } @@ -194,11 +225,10 @@ public class MaxRunningAppsEnforcer { if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " - + "usersNonRunnableApps, but was not. This should never happen."); + + "usersNonRunnableApps, but was not. This should never happen."); } } } - /** * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d138804e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index c29dbfc..9fadba9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2288,7 +2288,315 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Request should be fulfilled assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); } - + + @Test (timeout = 5000) + public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = "<?xml version=\"1.0\"?>" + + "<allocations>" + + "<queue name=\"root\">" + + "<queue name=\"queue1\">" + + "<maxRunningApps>1</maxRunningApps>" + + "</queue>" + + "</queue>" + + "</allocations>"; + + String allocAfter = "<?xml version=\"1.0\"?>" + + "<allocations>" + + "<queue name=\"root\">" + + "<queue name=\"queue1\">" + + "<maxRunningApps>3</maxRunningApps>" + + "</queue>" + + "</queue>" + + "</allocations>"; + + testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + @Test (timeout = 5000) + public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = "<?xml version=\"1.0\"?>"+ + "<allocations>"+ + "<queue name=\"root\">"+ + "<queue name=\"queue1\">"+ + "<maxRunningApps>10</maxRunningApps>"+ + "</queue>"+ + "</queue>"+ + "<user name=\"user1\">"+ + "<maxRunningApps>1</maxRunningApps>"+ + "</user>"+ + "</allocations>"; + + String allocAfter = "<?xml version=\"1.0\"?>"+ + "<allocations>"+ + "<queue name=\"root\">"+ + "<queue name=\"queue1\">"+ + "<maxRunningApps>10</maxRunningApps>"+ + "</queue>"+ + "</queue>"+ + "<user name=\"user1\">"+ + "<maxRunningApps>3</maxRunningApps>"+ + "</user>"+ + "</allocations>"; + + testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore, + String allocAfter) throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocBefore); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + // App 3 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocAfter); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 3 should be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 1 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should be running + assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + } + + @Test (timeout = 5000) + public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = "<?xml version=\"1.0\"?>" + + "<allocations>" + + "<queue name=\"root\">" + + "<queue name=\"queue1\">" + + "<maxRunningApps>3</maxRunningApps>" + + "</queue>" + + "</queue>" + + "</allocations>"; + + String allocAfter = "<?xml version=\"1.0\"?>" + + "<allocations>" + + "<queue name=\"root\">" + + "<queue name=\"queue1\">" + + "<maxRunningApps>1</maxRunningApps>" + + "</queue>" + + "</queue>" + + "</allocations>"; + + testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + @Test (timeout = 5000) + public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = "<?xml version=\"1.0\"?>"+ + "<allocations>"+ + "<queue name=\"root\">"+ + "<queue name=\"queue1\">"+ + "<maxRunningApps>10</maxRunningApps>"+ + "</queue>"+ + "</queue>"+ + "<user name=\"user1\">"+ + "<maxRunningApps>3</maxRunningApps>"+ + "</user>"+ + "</allocations>"; + + String allocAfter = "<?xml version=\"1.0\"?>"+ + "<allocations>"+ + "<queue name=\"root\">"+ + "<queue name=\"queue1\">"+ + "<maxRunningApps>10</maxRunningApps>"+ + "</queue>"+ + "</queue>"+ + "<user name=\"user1\">"+ + "<maxRunningApps>1</maxRunningApps>"+ + "</user>"+ + "</allocations>"; + + testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore, + String allocAfter) throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocBefore); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + // App 3 should be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocAfter); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should still be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 3 should still be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 1 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 2 + appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId2, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 3 + appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId3, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should be running now + assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + } + @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { scheduler.init(conf);