YARN-5136. Error in handling event type APP_ATTEMPT_REMOVED to the scheduler (Contributed by Wilfred Spiegelenburg via Daniel Templeton)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9f5d2c4f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9f5d2c4f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9f5d2c4f Branch: refs/heads/YARN-5972 Commit: 9f5d2c4fff6d31acc8b422b52462ef4927c4eea1 Parents: ab923a5 Author: Daniel Templeton <templ...@apache.org> Authored: Wed Dec 7 11:12:14 2016 -0800 Committer: Daniel Templeton <templ...@apache.org> Committed: Wed Dec 7 11:12:14 2016 -0800 ---------------------------------------------------------------------- .../scheduler/fair/FairScheduler.java | 32 +++++-- .../scheduler/fair/TestFairScheduler.java | 89 ++++++++++++++++++++ 2 files changed, 115 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f5d2c4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 03df5d4..e790bc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -633,8 +633,7 @@ public class FairScheduler extends RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { try { writeLock.lock(); - LOG.info( - "Application " + applicationAttemptId + " is done." + " finalState=" + LOG.info("Application " + applicationAttemptId + " is done. finalState=" + rmAppAttemptFinalState); FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId); @@ -644,6 +643,13 @@ public class FairScheduler extends return; } + // Check if the attempt is already stopped and don't stop it twice. + if (attempt.isStopped()) { + LOG.info("Application " + applicationAttemptId + " has already been " + + "stopped!"); + return; + } + // Release all the running containers for (RMContainer rmContainer : attempt.getLiveContainers()) { if (keepContainers && rmContainer.getState().equals( @@ -1521,6 +1527,13 @@ public class FairScheduler extends try { attempt.getWriteLock().lock(); FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); + // Check if the attempt is already stopped: don't move stopped app + // attempt. The attempt has already been removed from all queues. + if (attempt.isStopped()) { + LOG.info("Application " + appId + " is stopped and can't be moved!"); + throw new YarnException("Application " + appId + + " is stopped and can't be moved!"); + } String destQueueName = handleMoveToPlanQueue(queueName); FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); if (targetQueue == null) { @@ -1617,16 +1630,23 @@ public class FairScheduler extends * operations will be atomic. */ private void executeMove(SchedulerApplication<FSAppAttempt> app, - FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { - boolean wasRunnable = oldQueue.removeApp(attempt); + FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) + throws YarnException { + // Check current runs state. Do not remove the attempt from the queue until + // after the check has been performed otherwise it could remove the app + // from a queue without moving it to a new queue. + boolean wasRunnable = oldQueue.isRunnableApp(attempt); // if app was not runnable before, it may be runnable now boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, attempt); if (wasRunnable && !nowRunnable) { - throw new IllegalStateException("Should have already verified that app " + throw new YarnException("Should have already verified that app " + attempt.getApplicationId() + " would be runnable in new queue"); } - + + // Now it is safe to remove from the queue. + oldQueue.removeApp(attempt); + if (wasRunnable) { maxRunningEnforcer.untrackRunnableApp(attempt); } else if (nowRunnable) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f5d2c4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 55f8849..5aa1e2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -4552,6 +4552,95 @@ public class TestFairScheduler extends FairSchedulerTestBase { } @Test + public void testDoubleRemoval() throws Exception { + String testUser = "user1"; // convenience var + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + ApplicationAttemptId attemptId = createAppAttemptId(1, 1); + // The placement rule will add the app to the user based queue but the + // passed in queue must exist. + AppAddedSchedulerEvent appAddedEvent = + new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser, + testUser); + scheduler.handle(appAddedEvent); + AppAttemptAddedSchedulerEvent attemptAddedEvent = + new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false); + scheduler.handle(attemptAddedEvent); + + // Get a handle on the attempt. + FSAppAttempt attempt = scheduler.getSchedulerApp(attemptId); + + AppAttemptRemovedSchedulerEvent attemptRemovedEvent = + new AppAttemptRemovedSchedulerEvent(createAppAttemptId(1, 1), + RMAppAttemptState.FINISHED, false); + + // Make sure the app attempt is in the queue. + List<ApplicationAttemptId> attemptList = + scheduler.getAppsInQueue(testUser); + assertNotNull("Queue missing", attemptList); + assertTrue("Attempt should be in the queue", + attemptList.contains(attemptId)); + assertFalse("Attempt is stopped", attempt.isStopped()); + + // Now remove the app attempt + scheduler.handle(attemptRemovedEvent); + // The attempt is not in the queue, and stopped + attemptList = scheduler.getAppsInQueue(testUser); + assertFalse("Attempt should not be in the queue", + attemptList.contains(attemptId)); + assertTrue("Attempt should have been stopped", attempt.isStopped()); + + // Now remove the app attempt again, since it is stopped nothing happens. + scheduler.handle(attemptRemovedEvent); + // The attempt should still show the original queue info. + assertTrue("Attempt queue has changed", + attempt.getQueue().getName().endsWith(testUser)); + } + + @Test (expected = YarnException.class) + public void testMoveAfterRemoval() throws Exception { + String testUser = "user1"; // convenience var + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + ApplicationAttemptId attemptId = createAppAttemptId(1, 1); + AppAddedSchedulerEvent appAddedEvent = + new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser, + testUser); + scheduler.handle(appAddedEvent); + AppAttemptAddedSchedulerEvent attemptAddedEvent = + new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false); + scheduler.handle(attemptAddedEvent); + + // Get a handle on the attempt. + FSAppAttempt attempt = scheduler.getSchedulerApp(attemptId); + + AppAttemptRemovedSchedulerEvent attemptRemovedEvent = + new AppAttemptRemovedSchedulerEvent(createAppAttemptId(1, 1), + RMAppAttemptState.FINISHED, false); + + // Remove the app attempt + scheduler.handle(attemptRemovedEvent); + // Make sure the app attempt is not in the queue and stopped. + List<ApplicationAttemptId> attemptList = + scheduler.getAppsInQueue(testUser); + assertNotNull("Queue missing", attemptList); + assertFalse("Attempt should not be in the queue", + attemptList.contains(attemptId)); + assertTrue("Attempt should have been stopped", attempt.isStopped()); + // The attempt should still show the original queue info. + assertTrue("Attempt queue has changed", + attempt.getQueue().getName().endsWith(testUser)); + + // Now move the app: not using an event since there is none + // in the scheduler. This should throw. + scheduler.moveApplication(attemptId.getApplicationId(), "default"); + } + + @Test public void testPerfMetricsInited() { scheduler.init(conf); scheduler.start(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org