YARN-8191. Fair scheduler: queue deletion without RM restart. (Gergo Repas via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86bc6425 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86bc6425 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86bc6425 Branch: refs/heads/YARN-7402 Commit: 86bc6425d425913899f1d951498bd040e453b3d0 Parents: d9852eb Author: Haibo Chen <haiboc...@apache.org> Authored: Thu May 24 17:07:21 2018 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Thu May 24 17:12:34 2018 -0700 ---------------------------------------------------------------------- .../fair/AllocationFileLoaderService.java | 16 +- .../scheduler/fair/FSLeafQueue.java | 31 ++ .../resourcemanager/scheduler/fair/FSQueue.java | 9 + .../scheduler/fair/FairScheduler.java | 29 +- .../scheduler/fair/QueueManager.java | 155 +++++++-- .../fair/TestAllocationFileLoaderService.java | 100 +++--- .../scheduler/fair/TestQueueManager.java | 337 +++++++++++++++++++ 7 files changed, 596 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index d8d9051..7a40b6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -87,7 +87,7 @@ public class AllocationFileLoaderService extends AbstractService { private Path allocFile; private FileSystem fs; - private Listener reloadListener; + private final Listener reloadListener; @VisibleForTesting long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS; @@ -95,15 +95,16 @@ public class AllocationFileLoaderService extends AbstractService { private Thread reloadThread; private volatile boolean running = true; - public AllocationFileLoaderService() { - this(SystemClock.getInstance()); + public AllocationFileLoaderService(Listener reloadListener) { + this(reloadListener, SystemClock.getInstance()); } private List<Permission> defaultPermissions; - public AllocationFileLoaderService(Clock clock) { + public AllocationFileLoaderService(Listener reloadListener, Clock clock) { super(AllocationFileLoaderService.class.getName()); this.clock = clock; + this.reloadListener = reloadListener; } @Override @@ -114,6 +115,7 @@ public class AllocationFileLoaderService extends AbstractService { reloadThread = new Thread(() -> { while (running) { try { + reloadListener.onCheck(); long time = clock.getTime(); long lastModified = fs.getFileStatus(allocFile).getModificationTime(); @@ -207,10 +209,6 @@ public class AllocationFileLoaderService extends AbstractService { return allocPath; } - public synchronized void setReloadListener(Listener reloadListener) { - this.reloadListener = reloadListener; - } - /** * Updates the allocation list from the allocation config file. This file is * expected to be in the XML format specified in the design doc. @@ -351,5 +349,7 @@ public class AllocationFileLoaderService extends AbstractService { public interface Listener { void onReload(AllocationConfiguration info) throws IOException; + + void onCheck(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49d2166..e7da16f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -34,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -56,6 +59,8 @@ public class FSLeafQueue extends FSQueue { // apps that are runnable private final List<FSAppAttempt> runnableApps = new ArrayList<>(); private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>(); + // assignedApps keeps track of applications that have no appAttempts + private final Set<ApplicationId> assignedApps = new HashSet<>(); // get a lock with fair distribution for app list updates private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); private final Lock readLock = rwl.readLock(); @@ -89,6 +94,9 @@ public class FSLeafQueue extends FSQueue { } else { nonRunnableApps.add(app); } + // when an appAttempt is created for an application, we'd like to move + // it over from assignedApps to either runnableApps or nonRunnableApps + assignedApps.remove(app.getApplicationId()); incUsedResource(app.getResourceUsage()); } finally { writeLock.unlock(); @@ -440,6 +448,15 @@ public class FSLeafQueue extends FSQueue { return numPendingApps; } + public int getNumAssignedApps() { + readLock.lock(); + try { + return assignedApps.size(); + } finally { + readLock.unlock(); + } + } + /** * TODO: Based on how frequently this is called, we might want to club * counting pending and active apps in the same method. @@ -609,4 +626,18 @@ public class FSLeafQueue extends FSQueue { ", LastTimeAtMinShare: " + lastTimeAtMinShare + "}"); } + + /** + * This method is called when an application is assigned to this queue + * for book-keeping purposes (to be able to determine if the queue is empty). + * @param applicationId the application's id + */ + public void addAssignedApp(ApplicationId applicationId) { + writeLock.lock(); + try { + assignedApps.add(applicationId); + } finally { + writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 4babfd5..6b88a32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -83,6 +83,7 @@ public abstract class FSQueue implements Queue, Schedulable { private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; private boolean preemptable = true; + private boolean isDynamic = true; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -585,4 +586,12 @@ public abstract class FSQueue implements Queue, Schedulable { * @param sb the {code StringBuilder} which holds queue states */ protected abstract void dumpStateInternal(StringBuilder sb); + + public boolean isDynamic() { + return isDynamic; + } + + public void setDynamic(boolean dynamic) { + this.isDynamic = dynamic; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1c4bd51..4c84aa9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -99,6 +99,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashSet; @@ -207,7 +208,8 @@ public class FairScheduler extends public FairScheduler() { super(FairScheduler.class.getName()); context = new FSContext(this); - allocsLoader = new AllocationFileLoaderService(); + allocsLoader = + new AllocationFileLoaderService(new AllocationReloadListener()); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); } @@ -516,6 +518,7 @@ public class FairScheduler extends new SchedulerApplication<FSAppAttempt>(queue, user); applications.put(applicationId, application); queue.getMetrics().submitApp(user); + queue.addAssignedApp(applicationId); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queue.getName() @@ -1435,7 +1438,6 @@ public class FairScheduler extends } allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationReloadListener()); // If we fail to load allocations file on initialize, we want to fail // immediately. After a successful load, exceptions on future reloads // will just result in leaving things as they are. @@ -1589,6 +1591,7 @@ public class FairScheduler extends // Commit the reload; also create any queue defined in the alloc file // if it does not already exist, so it can be displayed on the web UI. + Set<String> removedStaticQueues = getRemovedStaticQueues(queueInfo); writeLock.lock(); try { if (queueInfo == null) { @@ -1599,6 +1602,7 @@ public class FairScheduler extends setQueueAcls(allocConf.getQueueAcls()); allocConf.getDefaultSchedulingPolicy().initialize(getContext()); queueMgr.updateAllocationConfiguration(allocConf); + queueMgr.setQueuesToDynamic(removedStaticQueues); applyChildDefaults(); maxRunningEnforcer.updateRunnabilityOnReload(); } @@ -1606,6 +1610,27 @@ public class FairScheduler extends writeLock.unlock(); } } + + private Set<String> getRemovedStaticQueues( + AllocationConfiguration queueInfo) { + if (queueInfo == null || allocConf == null) { + return Collections.emptySet(); + } + Set<String> removedStaticQueues = new HashSet<>(); + for (Set<String> queues : allocConf.getConfiguredQueues().values()) { + removedStaticQueues.addAll(queues); + } + for (Set<String> queues : queueInfo.getConfiguredQueues().values()) { + removedStaticQueues.removeAll(queues); + } + return removedStaticQueues; + } + + @Override + public void onCheck() { + queueMgr.removeEmptyDynamicQueues(); + queueMgr.removePendingIncompatibleQueues(); + } } private void setQueueAcls( http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 8734877..632a842 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -22,13 +22,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import javax.xml.parsers.ParserConfigurationException; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -52,6 +56,36 @@ public class QueueManager { public static final Log LOG = LogFactory.getLog( QueueManager.class.getName()); + private final class IncompatibleQueueRemovalTask { + + private final String queueToCreate; + private final FSQueueType queueType; + + private IncompatibleQueueRemovalTask(String queueToCreate, + FSQueueType queueType) { + this.queueToCreate = queueToCreate; + this.queueType = queueType; + } + + private void execute() { + Boolean removed = + removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null); + if (Boolean.TRUE.equals(removed)) { + FSQueue queue = getQueue(queueToCreate, true, queueType, false); + if (queue != null && + // if queueToCreate is present in the allocation config, set it + // to static + scheduler.allocConf.configuredQueues.values().stream() + .anyMatch(s -> s.contains(queueToCreate))) { + queue.setDynamic(false); + } + } + if (!Boolean.FALSE.equals(removed)) { + incompatibleQueuesPendingRemoval.remove(this); + } + } + } + public static final String ROOT_QUEUE = "root"; private final FairScheduler scheduler; @@ -59,6 +93,8 @@ public class QueueManager { private final Collection<FSLeafQueue> leafQueues = new CopyOnWriteArrayList<FSLeafQueue>(); private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>(); + private Set<IncompatibleQueueRemovalTask> incompatibleQueuesPendingRemoval = + new HashSet<>(); private FSParentQueue rootQueue; public QueueManager(FairScheduler scheduler) { @@ -75,10 +111,13 @@ public class QueueManager { // SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been // loaded yet. rootQueue = new FSParentQueue("root", scheduler, null); + rootQueue.setDynamic(false); queues.put(rootQueue.getName(), rootQueue); // Create the default queue - getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); + FSLeafQueue defaultQueue = + getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); + defaultQueue.setDynamic(false); // Recursively reinitialize to propagate queue properties rootQueue.reinit(true); } @@ -121,7 +160,8 @@ public class QueueManager { */ public boolean removeLeafQueue(String name) { name = ensureRootPrefix(name); - return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT); + return !Boolean.FALSE.equals( + removeEmptyIncompatibleQueues(name, FSQueueType.PARENT).orElse(null)); } @@ -346,9 +386,13 @@ public class QueueManager { * * We will never remove the root queue or the default queue in this way. * - * @return true if we can create queueToCreate or it already exists. + * @return Optional.of(Boolean.TRUE) if there was an incompatible queue that + * has been removed, + * Optional.of(Boolean.FALSE) if there was an incompatible queue that + * have not be removed, + * Optional.empty() if there is no incompatible queue. */ - private boolean removeEmptyIncompatibleQueues(String queueToCreate, + private Optional<Boolean> removeEmptyIncompatibleQueues(String queueToCreate, FSQueueType queueType) { queueToCreate = ensureRootPrefix(queueToCreate); @@ -357,7 +401,7 @@ public class QueueManager { if (queueToCreate.equals(ROOT_QUEUE) || queueToCreate.startsWith( ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) { - return false; + return Optional.empty(); } FSQueue queue = queues.get(queueToCreate); @@ -365,19 +409,18 @@ public class QueueManager { if (queue != null) { if (queue instanceof FSLeafQueue) { if (queueType == FSQueueType.LEAF) { - // if queue is already a leaf then return true - return true; + return Optional.empty(); } // remove incompatibility since queue is a leaf currently // needs to change to a parent. - return removeQueueIfEmpty(queue); + return Optional.of(removeQueueIfEmpty(queue)); } else { if (queueType == FSQueueType.PARENT) { - return true; + return Optional.empty(); } // If it's an existing parent queue and needs to change to leaf, // remove it if it's empty. - return removeQueueIfEmpty(queue); + return Optional.of(removeQueueIfEmpty(queue)); } } @@ -389,11 +432,51 @@ public class QueueManager { String prefixString = queueToCreate.substring(0, sepIndex); FSQueue prefixQueue = queues.get(prefixString); if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) { - return removeQueueIfEmpty(prefixQueue); + return Optional.of(removeQueueIfEmpty(prefixQueue)); } sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); } - return true; + return Optional.empty(); + } + + /** + * Removes all empty dynamic queues (including empty dynamic parent queues). + */ + public void removeEmptyDynamicQueues() { + synchronized (queues) { + Set<FSParentQueue> parentQueuesToCheck = new HashSet<>(); + for (FSQueue queue : getQueues()) { + if (queue.isDynamic() && queue.getChildQueues().isEmpty()) { + boolean removed = removeQueueIfEmpty(queue); + if (removed && queue.getParent().isDynamic()) { + parentQueuesToCheck.add(queue.getParent()); + } + } + } + while (!parentQueuesToCheck.isEmpty()) { + FSParentQueue queue = parentQueuesToCheck.iterator().next(); + if (queue.getChildQueues().isEmpty()) { + removeQueue(queue); + if (queue.getParent().isDynamic()) { + parentQueuesToCheck.add(queue.getParent()); + } + } + parentQueuesToCheck.remove(queue); + } + } + } + + /** + * Re-checking incompatible queues that could not be removed earlier due to + * not being empty, and removing those that became empty. + */ + public void removePendingIncompatibleQueues() { + synchronized (queues) { + for (IncompatibleQueueRemovalTask removalTask : + ImmutableSet.copyOf(incompatibleQueuesPendingRemoval)) { + removalTask.execute(); + } + } } /** @@ -435,7 +518,8 @@ public class QueueManager { if (queue instanceof FSLeafQueue) { FSLeafQueue leafQueue = (FSLeafQueue)queue; return queue.getNumRunnableApps() == 0 && - leafQueue.getNumNonRunnableApps() == 0; + leafQueue.getNumNonRunnableApps() == 0 && + leafQueue.getNumAssignedApps() == 0; } else { for (FSQueue child : queue.getChildQueues()) { if (!isEmpty(child)) { @@ -501,21 +585,13 @@ public class QueueManager { LOG.error("Setting scheduling policies for existing queues failed!"); } - for (String name : queueConf.getConfiguredQueues().get( - FSQueueType.LEAF)) { - if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { - getLeafQueue(name, true, false); - } - } + ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, FSQueueType.LEAF); + // At this point all leaves and 'parents with // at least one child' would have been created. // Now create parents with no configured leaf. - for (String name : queueConf.getConfiguredQueues().get( - FSQueueType.PARENT)) { - if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) { - getParentQueue(name, true, false); - } - } + ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, + FSQueueType.PARENT); } // Initialize all queues recursively @@ -524,6 +600,35 @@ public class QueueManager { rootQueue.recomputeSteadyShares(); } + private void ensureQueueExistsAndIsCompatibleAndIsStatic( + AllocationConfiguration queueConf, FSQueueType queueType) { + for (String name : queueConf.getConfiguredQueues().get(queueType)) { + Boolean removed = + removeEmptyIncompatibleQueues(name, queueType).orElse(null); + if (Boolean.FALSE.equals(removed)) { + incompatibleQueuesPendingRemoval.add( + new IncompatibleQueueRemovalTask(name, queueType)); + } else { + FSQueue queue = getQueue(name, true, queueType, false); + if (queue != null) { + queue.setDynamic(false); + } + } + } + } + + /** + * Setting a set of queues to dynamic. + * @param queueNames The names of the queues to be set to dynamic + */ + protected void setQueuesToDynamic(Set<String> queueNames) { + synchronized (queues) { + for (String queueName : queueNames) { + queues.get(queueName).setDynamic(true); + } + } + } + /** * Check whether queue name is valid, * return true if it is valid, otherwise return false. http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 8591d67..30b8a91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService.Listener; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; @@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Fai import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; +import org.mockito.Mockito; + import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; @@ -79,7 +82,8 @@ public class TestAllocationFileLoaderService { fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath)); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(Mockito.mock(Listener.class)); Path allocationFile = allocLoader.getAllocationFile(conf); assertEquals(fsAllocPath, allocationFile.toString()); assertTrue(fs.exists(allocationFile)); @@ -92,7 +96,8 @@ public class TestAllocationFileLoaderService { throws UnsupportedFileSystemException { Configuration conf = new YarnConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile"); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(Mockito.mock(Listener.class)); allocLoader.getAllocationFile(conf); } @@ -105,7 +110,7 @@ public class TestAllocationFileLoaderService { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, TEST_FAIRSCHED_XML); AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(); + new AllocationFileLoaderService(Mockito.mock(Listener.class)); Path allocationFile = allocLoader.getAllocationFile(conf); assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName()); assertTrue(fs.exists(allocationFile)); @@ -134,12 +139,11 @@ public class TestAllocationFileLoaderService { Configuration conf = new Configuration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService( - clock); + ReloadListener confHolder = new ReloadListener(); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder, clock); allocLoader.reloadIntervalMs = 5; allocLoader.init(conf); - ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; @@ -205,7 +209,9 @@ public class TestAllocationFileLoaderService { public void testAllocationFileParsing() throws Exception { Configuration conf = new Configuration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + ReloadListener confHolder = new ReloadListener(); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); AllocationFileWriter .create() @@ -278,8 +284,6 @@ public class TestAllocationFileLoaderService { .writeToFile(ALLOC_FILE); allocLoader.init(conf); - ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; @@ -427,7 +431,9 @@ public class TestAllocationFileLoaderService { public void testBackwardsCompatibleAllocationFileParsing() throws Exception { Configuration conf = new Configuration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + ReloadListener confHolder = new ReloadListener(); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); @@ -473,8 +479,6 @@ public class TestAllocationFileLoaderService { out.close(); allocLoader.init(conf); - ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; @@ -550,10 +554,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; @@ -584,10 +588,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); } @@ -608,10 +612,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); } @@ -632,10 +636,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); } @@ -654,10 +658,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); try { allocLoader.reloadAllocations(); } catch (AllocationConfigurationException ex) { @@ -685,10 +689,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); try { allocLoader.reloadAllocations(); } catch (AllocationConfigurationException ex) { @@ -714,10 +718,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; // Check whether queue 'parent' and 'child' are loaded successfully @@ -745,10 +749,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); } @@ -767,10 +771,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); } @@ -793,10 +797,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; @@ -853,10 +857,10 @@ public class TestAllocationFileLoaderService { out.println("</allocations>"); out.close(); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); - allocLoader.setReloadListener(confHolder); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(confHolder); + allocLoader.init(conf); allocLoader.reloadAllocations(); } @@ -867,5 +871,9 @@ public class TestAllocationFileLoaderService { public void onReload(AllocationConfiguration info) { allocConf = info; } + + @Override + public void onCheck() { + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java index eb2d402..3674ffb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java @@ -20,15 +20,22 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.util.Collections; import java.util.HashSet; import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; public class TestQueueManager { @@ -305,4 +312,334 @@ public class TestQueueManager { assertEquals("createQueue() returned wrong queue", "root.queue1.queue2", q2.getName()); } + + @Test + public void testRemovalOfDynamicLeafQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", true); + + assertNotNull("Queue root.test.childB.dynamic1 was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.test.childB.dynamic1", q1.getName()); + assertTrue("root.test.childB.dynamic1 is not a dynamic queue", + q1.isDynamic()); + + // an application is submitted to root.test.childB.dynamic1 + notEmptyQueues.add(q1); + + // root.test.childB.dynamic1 is not empty and should not be removed + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false); + assertNotNull("Queue root.test.childB.dynamic1 was deleted", q1); + + // the application finishes, the next removeEmptyDynamicQueues() should + // clean root.test.childB.dynamic1 up, but keep its static parent + notEmptyQueues.remove(q1); + + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false); + assertNull("Queue root.test.childB.dynamic1 was not deleted", q1); + assertNotNull("The static parent of root.test.childB.dynamic1 was deleted", + queueManager.getParentQueue("root.test.childB", false)); + } + + @Test + public void testRemovalOfDynamicParentQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.getLeafQueue("root.parent1.dynamic1", true); + + assertNotNull("Queue root.parent1.dynamic1 was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.parent1.dynamic1", q1.getName()); + assertTrue("root.parent1.dynamic1 is not a dynamic queue", q1.isDynamic()); + + FSQueue p1 = queueManager.getParentQueue("root.parent1", false); + assertNotNull("Queue root.parent1 was not created", p1); + assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic()); + + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.parent1.dynamic1", false); + p1 = queueManager.getParentQueue("root.parent1", false); + + assertNull("Queue root.parent1.dynamic1 was not deleted", q1); + assertNull("Queue root.parent1 was not deleted", p1); + } + + @Test + public void testNonEmptyDynamicQueueBecomingStaticQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q1 = queueManager.getLeafQueue("root.leaf1", true); + + assertNotNull("Queue root.leaf1 was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.leaf1", q1.getName()); + assertTrue("root.leaf1 is not a dynamic queue", q1.isDynamic()); + + // pretend that we submitted an app to the queue + notEmptyQueues.add(q1); + + // non-empty queues should not be deleted + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("Queue root.leaf1 was deleted", q1); + + // next we add leaf1 under root in the allocation config + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.leaf1"); + queueManager.updateAllocationConfiguration(allocConf); + + // updateAllocationConfiguration() should make root.leaf1 a dynamic queue + assertFalse("root.leaf1 is not a static queue", q1.isDynamic()); + + // application finished now and the queue is empty, but since leaf1 is a + // static queue at this point, hence not affected by + // removeEmptyDynamicQueues() + notEmptyQueues.clear(); + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("Queue root.leaf1 was deleted", q1); + assertFalse("root.leaf1 is not a static queue", q1.isDynamic()); + } + + @Test + public void testNonEmptyStaticQueueBecomingDynamicQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q1 = queueManager.getLeafQueue("root.test.childA", false); + + assertNotNull("Queue root.test.childA does not exist", q1); + assertEquals("createQueue() returned wrong queue", + "root.test.childA", q1.getName()); + assertFalse("root.test.childA is not a static queue", q1.isDynamic()); + + // we submitted an app to the queue + notEmptyQueues.add(q1); + + // the next removeEmptyDynamicQueues() call should not modify + // root.test.childA + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNotNull("Queue root.test.childA was deleted", q1); + assertFalse("root.test.childA is not a dynamic queue", q1.isDynamic()); + + // next we remove all queues from the allocation config, + // this causes all queues to change to dynamic + for (Set<String> queueNames : allocConf.configuredQueues.values()) { + queueManager.setQueuesToDynamic(queueNames); + queueNames.clear(); + } + queueManager.updateAllocationConfiguration(allocConf); + + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNotNull("Queue root.test.childA was deleted", q1); + assertTrue("root.test.childA is not a dynamic queue", q1.isDynamic()); + + // application finished - the queue does not have runnable app + // the next removeEmptyDynamicQueues() call should remove the queues + notEmptyQueues.remove(q1); + + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNull("Queue root.test.childA was not deleted", q1); + + FSParentQueue p1 = queueManager.getParentQueue("root.test", false); + assertNull("Queue root.test was not deleted", p1); + } + + @Test + public void testRemovalOfChildlessParentQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + queueManager.updateAllocationConfiguration(allocConf); + + FSParentQueue q1 = queueManager.getParentQueue("root.test.childB", false); + + assertNotNull("Queue root.test.childB was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.test.childB", q1.getName()); + assertFalse("root.test.childB is a dynamic queue", q1.isDynamic()); + + // static queues should not be deleted + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getParentQueue("root.test.childB", false); + assertNotNull("Queue root.test.childB was deleted", q1); + + // next we remove root.test.childB from the allocation config + allocConf.configuredQueues.get(FSQueueType.PARENT) + .remove("root.test.childB"); + queueManager.updateAllocationConfiguration(allocConf); + queueManager.setQueuesToDynamic(Collections.singleton("root.test.childB")); + + // the next removeEmptyDynamicQueues() call should clean + // root.test.childB up + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getParentQueue("root.leaf1", false); + assertNull("Queue root.leaf1 was not deleted", q1); + } + + @Test + public void testQueueTypeChange() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.getLeafQueue("root.parent1.leaf1", true); + assertNotNull("Queue root.parent1.leaf1 was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.parent1.leaf1", q1.getName()); + assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic()); + + FSQueue p1 = queueManager.getParentQueue("root.parent1", false); + assertNotNull("Queue root.parent1 was not created", p1); + assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic()); + + // adding root.parent1.leaf1 and root.parent1 to the allocation config + allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.parent1"); + allocConf.configuredQueues.get(FSQueueType.LEAF) + .add("root.parent1.leaf1"); + + // updateAllocationConfiguration() should change both queues over to static + queueManager.updateAllocationConfiguration(allocConf); + q1 = queueManager.getLeafQueue("root.parent1.leaf1", false); + assertFalse("root.parent1.leaf1 is not a static queue", q1.isDynamic()); + p1 = queueManager.getParentQueue("root.parent1", false); + assertFalse("root.parent1 is not a static queue", p1.isDynamic()); + + // removing root.parent1.leaf1 and root.parent1 from the allocation + // config + allocConf.configuredQueues.get(FSQueueType.PARENT).remove("root.parent1"); + allocConf.configuredQueues.get(FSQueueType.LEAF) + .remove("root.parent1.leaf1"); + + // updateAllocationConfiguration() should change both queues + // to dynamic + queueManager.updateAllocationConfiguration(allocConf); + queueManager.setQueuesToDynamic( + ImmutableSet.of("root.parent1", "root.parent1.leaf1")); + q1 = queueManager.getLeafQueue("root.parent1.leaf1", false); + assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic()); + p1 = queueManager.getParentQueue("root.parent1", false); + assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic()); + } + + @Test + public void testApplicationAssignmentPreventsRemovalOfDynamicQueue() + throws Exception { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + queueManager = new QueueManager(scheduler); + queueManager.initialize(conf); + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q = queueManager.getLeafQueue("root.leaf1", true); + assertNotNull("root.leaf1 does not exist", q); + assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q)); + + // assigning an application (without an appAttempt so far) to the queue + // removeEmptyDynamicQueues() should not remove the queue + ApplicationId applicationId = ApplicationId.newInstance(1L, 0); + q.addAssignedApp(applicationId); + q = queueManager.getLeafQueue("root.leaf1", false); + assertFalse("root.leaf1 is empty", queueManager.isEmpty(q)); + + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("root.leaf1 has been removed", q); + assertFalse("root.leaf1 is empty", queueManager.isEmpty(q)); + + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + ActiveUsersManager activeUsersManager = + Mockito.mock(ActiveUsersManager.class); + RMContext rmContext = Mockito.mock(RMContext.class); + + // the appAttempt is created + // removeEmptyDynamicQueues() should not remove the queue + FSAppAttempt appAttempt = new FSAppAttempt(scheduler, applicationAttemptId, + "a_user", q, activeUsersManager, rmContext); + q.addApp(appAttempt, true); + queueManager.removeEmptyDynamicQueues(); + q = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("root.leaf1 has been removed", q); + assertFalse("root.leaf1 is empty", queueManager.isEmpty(q)); + + // the appAttempt finished, the queue should be empty + q.removeApp(appAttempt); + q = queueManager.getLeafQueue("root.leaf1", false); + assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q)); + + // removeEmptyDynamicQueues() should remove the queue + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q = queueManager.getLeafQueue("root.leaf1", false); + assertNull("root.leaf1 has not been removed", q); + } + + @Test + public void testRemovalOfIncompatibleNonEmptyQueue() + throws Exception { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a"); + scheduler.allocConf = allocConf; + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q = queueManager.getLeafQueue("root.a", true); + assertNotNull("root.a does not exist", q); + assertTrue("root.a is not empty", queueManager.isEmpty(q)); + + // we start to run an application on root.a + notEmptyQueues.add(q); + q = queueManager.getLeafQueue("root.a", false); + assertNotNull("root.a does not exist", q); + assertFalse("root.a is empty", queueManager.isEmpty(q)); + + // root.a should not be removed by removeEmptyDynamicQueues or by + // removePendingIncompatibleQueues + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q = queueManager.getLeafQueue("root.a", false); + assertNotNull("root.a does not exist", q); + + // let's introduce queue incompatibility + allocConf.configuredQueues.get(FSQueueType.LEAF).remove("root.a"); + allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.a"); + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a.b"); + queueManager.updateAllocationConfiguration(allocConf); + + // since root.a has running applications, it should be still a leaf queue + q = queueManager.getLeafQueue("root.a", false); + assertNotNull("root.a has been removed", q); + assertFalse("root.a is empty", queueManager.isEmpty(q)); + + // removePendingIncompatibleQueues should still keep root.a as a leaf queue + queueManager.removePendingIncompatibleQueues(); + q = queueManager.getLeafQueue("root.a", false); + assertNotNull("root.a has been removed", q); + assertFalse("root.a is empty", queueManager.isEmpty(q)); + + // when the application finishes, root.a should be a parent queue + notEmptyQueues.clear(); + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + FSParentQueue p = queueManager.getParentQueue("root.a", false); + assertNotNull("root.a does not exist", p); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org