Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu May 9 22:46:39 2013 @@ -23,11 +23,12 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -57,9 +58,12 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; +@Private +@Unstable public class FSSchedulerApp extends SchedulerApplication { private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); @@ -83,7 +87,9 @@ public class FSSchedulerApp extends Sche final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = new HashMap<Priority, Map<NodeId, RMContainer>>(); - + + final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>(); + /** * Count how many times the application has been given an opportunity * to schedule a task at each priority. Each time the scheduler @@ -230,6 +236,9 @@ public class FSSchedulerApp extends Sche Resource containerResource = rmContainer.getContainer().getResource(); queue.getMetrics().releaseResources(getUser(), 1, containerResource); Resources.subtractFrom(currentConsumption, containerResource); + + // remove from preemption map if it is completed + preemptionMap.remove(rmContainer); } synchronized public List<Container> pullNewlyAllocatedContainers() { @@ -306,8 +315,7 @@ public class FSSchedulerApp extends Sche * Used only by unit tests * @return total current reservations */ - @Stable - @Private + @VisibleForTesting public synchronized Resource getCurrentReservation() { return currentReservation; } @@ -572,4 +580,18 @@ public class FSSchedulerApp extends Sche " priority " + priority); allowedLocalityLevel.put(priority, level); } + + // related methods + public void addPreemption(RMContainer container, long time) { + assert preemptionMap.get(container) == null; + preemptionMap.put(container, time); + } + + public Long getContainerPreemptionTime(RMContainer container) { + return preemptionMap.get(container); + } + + public Set<RMContainer> getPreemptionContainers() { + return preemptionMap.keySet(); + } }
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Thu May 9 22:46:39 2013 @@ -25,6 +25,8 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -39,6 +41,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +@Private +@Unstable public class FSSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu May 9 22:46:39 2013 @@ -24,8 +24,11 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -155,10 +158,16 @@ public class FairScheduler implements Re private Resource clusterCapacity = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - // How often tasks are preempted (must be longer than a couple + // How often tasks are preempted + protected long preemptionInterval; + + // ms to wait before force killing stuff (must be longer than a couple // of heartbeats to give task-kill commands a chance to act). - protected long preemptionInterval = 15000; - + protected long waitTimeBeforeKill; + + // Containers whose AMs have been warned that they will be preempted soon. + private List<RMContainer> warnedContainers = new ArrayList<RMContainer>(); + protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster @@ -225,10 +234,6 @@ public class FairScheduler implements Re // Recursively compute fair shares for all queues // and update metrics rootQueue.recomputeShares(); - - // Update recorded capacity of root queue (child queues are updated - // when fair share is calculated). - rootMetrics.setAvailableResourcesToQueue(clusterCapacity); } /** @@ -335,34 +340,78 @@ public class FairScheduler implements Re // Sort containers into reverse order of priority Collections.sort(runningContainers, new Comparator<RMContainer>() { public int compare(RMContainer c1, RMContainer c2) { - return c2.getContainer().getPriority().compareTo( + int ret = c2.getContainer().getPriority().compareTo( c1.getContainer().getPriority()); + if (ret == 0) { + return c2.getContainerId().compareTo(c1.getContainerId()); + } + return ret; } }); + + // Scan down the list of containers we've already warned and kill them + // if we need to. Remove any containers from the list that we don't need + // or that are no longer running. + Iterator<RMContainer> warnedIter = warnedContainers.iterator(); + Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>(); + while (warnedIter.hasNext()) { + RMContainer container = warnedIter.next(); + if (container.getState() == RMContainerState.RUNNING && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + warnOrKillContainer(container, apps.get(container), queues.get(container)); + preemptedThisRound.add(container); + Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + } else { + warnedIter.remove(); + } + } - // Scan down the sorted list of task statuses until we've killed enough - // tasks, making sure we don't kill too many from any queue - for (RMContainer container : runningContainers) { + // Scan down the rest of the containers until we've preempted enough, making + // sure we don't preempt too many from any queue + Iterator<RMContainer> runningIter = runningContainers.iterator(); + while (runningIter.hasNext() && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + RMContainer container = runningIter.next(); FSLeafQueue sched = queues.get(container); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + - "res=" + container.getContainer().getResource() + - ") from queue " + sched.getName()); - ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus( + if (!preemptedThisRound.contains(container) && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), sched.getFairShare())) { + warnOrKillContainer(container, apps.get(container), sched); + + warnedContainers.add(container); + Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + } + } + } + + private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, + FSLeafQueue queue) { + LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + + "res=" + container.getContainer().getResource() + + ") from queue " + queue.getName()); + + Long time = app.getContainerPreemptionTime(container); + + if (time != null) { + // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, + // proceed with kill + if (time + waitTimeBeforeKill < clock.getTime()) { + ContainerStatus status = + SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); - - toPreempt = Resources.subtract(toPreempt, - container.getContainer().getResource()); - if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity, - toPreempt, Resources.none())) { - break; - } + LOG.info("Killing container" + container + + " (after waiting for premption for " + + (clock.getTime() - time) + "ms)"); } + } else { + // track the request in the FSSchedulerApp itself + app.addPreemption(container, clock.getTime()); } } @@ -487,11 +536,11 @@ public class FairScheduler implements Re return clusterCapacity; } - public Clock getClock() { + public synchronized Clock getClock() { return clock; } - protected void setClock(Clock clock) { + protected synchronized void setClock(Clock clock) { this.clock = clock; } @@ -617,6 +666,7 @@ public class FairScheduler implements Re } else { application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); + updateRootQueueMetrics(); } LOG.info("Application " + applicationAttemptId + @@ -628,6 +678,7 @@ public class FairScheduler implements Re private synchronized void addNode(RMNode node) { nodes.put(node.getNodeID(), new FSSchedulerNode(node)); Resources.addTo(clusterCapacity, node.getTotalCapability()); + updateRootQueueMetrics(); LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: " + clusterCapacity); @@ -636,6 +687,7 @@ public class FairScheduler implements Re private synchronized void removeNode(RMNode rmNode) { FSSchedulerNode node = nodes.get(rmNode.getNodeID()); Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); + updateRootQueueMetrics(); // Remove running containers List<RMContainer> runningContainers = node.getRunningContainers(); @@ -746,10 +798,18 @@ public class FairScheduler implements Re LOG.debug("allocate:" + " applicationAttemptId=" + appAttemptId + " #ask=" + ask.size()); - } + LOG.debug("Preempting " + application.getPreemptionContainers().size() + + " container(s)"); + } + + Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>(); + for (RMContainer container : application.getPreemptionContainers()) { + preemptionContainerIds.add(container.getContainerId()); + } + return new Allocation(application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + application.getHeadroom(), preemptionContainerIds); } } @@ -832,6 +892,7 @@ public class FairScheduler implements Re if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, queueMgr.getRootQueue().assignContainer(node), Resources.none())) { + assignedContainers++; assignedContainer = true; } if (!assignedContainer) { break; } @@ -839,6 +900,7 @@ public class FairScheduler implements Re if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; } } } + updateRootQueueMetrics(); } @Override @@ -860,6 +922,18 @@ public class FairScheduler implements Re } return new SchedulerAppReport(applications.get(appAttemptId)); } + + /** + * Subqueue metrics might be a little out of date because fair shares are + * recalculated at the update interval, but the root queue metrics needs to + * be updated synchronously with allocations and completions so that cluster + * metrics will be consistent. + */ + private void updateRootQueueMetrics() { + rootMetrics.setAvailableResourcesToQueue( + Resources.subtract( + clusterCapacity, rootMetrics.getAllocatedResources())); + } @Override public QueueMetrics getRootQueueMetrics() { @@ -950,7 +1024,9 @@ public class FairScheduler implements Re assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); sizeBasedWeight = this.conf.getSizeBasedWeight(); - + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + if (!initialized) { rootMetrics = QueueMetrics.forQueue("root", null, true, conf); this.rmContext = rmContext; Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Thu May 9 22:46:39 2013 @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.File; - +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +@Private +@Evolving public class FairSchedulerConfiguration extends Configuration { public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml"; @@ -52,6 +55,11 @@ public class FairSchedulerConfiguration /** Whether preemption is enabled. */ protected static final String PREEMPTION = CONF_PREFIX + "preemption"; protected static final boolean DEFAULT_PREEMPTION = false; + + protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval"; + protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000; + protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill"; + protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000; /** Whether to assign multiple containers in one check-in. */ protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; @@ -120,4 +128,12 @@ public class FairSchedulerConfiguration return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir", "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); } + + public int getPreemptionInterval() { + return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL); + } + + public int getWaitTimeBeforeKill() { + return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Thu May 9 22:46:39 2013 @@ -22,14 +22,14 @@ import java.util.Comparator; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @Public -@Unstable +@Evolving public abstract class SchedulingPolicy { private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances = new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>(); Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Thu May 9 22:46:39 2013 @@ -21,6 +21,8 @@ import java.io.Serializable; import java.util.Collection; import java.util.Comparator; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; @@ -29,6 +31,8 @@ import org.apache.hadoop.yarn.server.res import com.google.common.annotations.VisibleForTesting; +@Private +@Unstable public class FairSharePolicy extends SchedulingPolicy { @VisibleForTesting public static final String NAME = "Fairshare"; Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Thu May 9 22:46:39 2013 @@ -21,6 +21,8 @@ import java.io.Serializable; import java.util.Collection; import java.util.Comparator; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; @@ -28,6 +30,8 @@ import org.apache.hadoop.yarn.server.res import com.google.common.annotations.VisibleForTesting; +@Private +@Unstable public class FifoPolicy extends SchedulingPolicy { @VisibleForTesting public static final String NAME = "FIFO"; Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Thu May 9 22:46:39 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.Map; @@ -199,6 +200,8 @@ public class MockRM extends ResourceMana return client.submitApplication(req); } catch (YarnRemoteException e) { e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); } return null; } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Thu May 9 22:46:39 2013 @@ -339,7 +339,7 @@ public class TestClientRMTokens { DelegationToken token = loggedInUser .doAs(new PrivilegedExceptionAction<DelegationToken>() { @Override - public DelegationToken run() throws YarnRemoteException { + public DelegationToken run() throws YarnRemoteException, IOException { GetDelegationTokenRequest request = Records .newRecord(GetDelegationTokenRequest.class); request.setRenewer(renewerString); @@ -355,7 +355,7 @@ public class TestClientRMTokens { throws IOException, InterruptedException { long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() { @Override - public Long run() throws YarnRemoteException { + public Long run() throws YarnRemoteException, IOException { RenewDelegationTokenRequest request = Records .newRecord(RenewDelegationTokenRequest.class); request.setDelegationToken(dToken); @@ -371,7 +371,7 @@ public class TestClientRMTokens { throws IOException, InterruptedException { loggedInUser.doAs(new PrivilegedExceptionAction<Void>() { @Override - public Void run() throws YarnRemoteException { + public Void run() throws YarnRemoteException, IOException { CancelDelegationTokenRequest request = Records .newRecord(CancelDelegationTokenRequest.class); request.setDelegationToken(dToken); Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Thu May 9 22:46:39 2013 @@ -66,20 +66,20 @@ public class TestQueueMetrics { MetricsSource userSource = userSource(ms, queueName, user); checkApps(queueSource, 1, 1, 0, 0, 0, 0); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); + metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); + checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); metrics.incrAppsRunning(app, user); checkApps(queueSource, 1, 0, 1, 0, 0, 0); - metrics.allocateResources(user, 3, Resources.createResource(2*GB)); - checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - metrics.releaseResources(user, 1, Resources.createResource(2*GB)); - checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); + metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); + checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); metrics.finishApp(app, RMAppAttemptState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0); @@ -148,25 +148,25 @@ public class TestQueueMetrics { checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(userSource, 1, 1, 0, 0, 0, 0); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); - metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); + metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); - checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); + checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); + checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); metrics.incrAppsRunning(app, user); checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0); - metrics.allocateResources(user, 3, Resources.createResource(2*GB)); - checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0); - checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 0, 0); - - metrics.releaseResources(user, 1, Resources.createResource(2*GB)); - checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); - checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); + + metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); + checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); metrics.finishApp(app, RMAppAttemptState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0); @@ -197,35 +197,35 @@ public class TestQueueMetrics { checkApps(userSource, 1, 1, 0, 0, 0, 0); checkApps(parentUserSource, 1, 1, 0, 0, 0, 0); - parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); - parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); - metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); - checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); - checkResources(parentQueueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); - checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); - checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); + parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); + parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); + checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); + checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); + checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); + checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); metrics.incrAppsRunning(app, user); checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0); - metrics.allocateResources(user, 3, Resources.createResource(2*GB)); - metrics.reserveResource(user, Resources.createResource(3*GB)); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + metrics.reserveResource(user, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1); - checkResources(parentQueueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1); - checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1); - checkResources(parentUserSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1); - - metrics.releaseResources(user, 1, Resources.createResource(2*GB)); - metrics.unreserveResource(user, Resources.createResource(3*GB)); - checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); - checkResources(parentQueueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); - checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); - checkResources(parentUserSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); + checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); + checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); + checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); + checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); + + metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); + metrics.unreserveResource(user, Resources.createResource(3*GB, 3)); + checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); + checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); metrics.finishApp(app, RMAppAttemptState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0); @@ -277,18 +277,23 @@ public class TestQueueMetrics { } public static void checkResources(MetricsSource source, int allocatedMB, - int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, - int availableMB, int pendingMB, int pendingCtnrs, - int reservedMB, int reservedCtnrs) { + int allocatedCores, int allocCtnrs, long aggreAllocCtnrs, + long aggreReleasedCtnrs, int availableMB, int availableCores, int pendingMB, + int pendingCores, int pendingCtnrs, int reservedMB, int reservedCores, + int reservedCtnrs) { MetricsRecordBuilder rb = getMetrics(source); assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); assertGauge("AllocatedContainers", allocCtnrs, rb); assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); assertGauge("PendingContainers", pendingCtnrs, rb); assertGauge("ReservedMB", reservedMB, rb); + assertGauge("ReservedVCores", reservedCores, rb); assertGauge("ReservedContainers", reservedCtnrs, rb); } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu May 9 22:46:39 2013 @@ -30,6 +30,7 @@ import java.io.PrintWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; @@ -127,6 +129,7 @@ public class TestFairScheduler { public void tearDown() { scheduler = null; resourceManager = null; + QueueMetrics.clearQueueMetrics(); } private Configuration createConfiguration() { @@ -336,6 +339,13 @@ public class TestFairScheduler { assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). getResourceUsage().getMemory()); + + // verify metrics + QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1") + .getMetrics(); + assertEquals(1024, queue1Metrics.getAllocatedMB()); + assertEquals(1024, scheduler.getRootQueueMetrics().getAllocatedMB()); + assertEquals(512, scheduler.getRootQueueMetrics().getAvailableMB()); } @Test (timeout = 5000) @@ -891,9 +901,16 @@ public class TestFairScheduler { */ public void testChoiceOfPreemptedContainers() throws Exception { Configuration conf = createConfiguration(); + + conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); @@ -988,15 +1005,38 @@ public class TestFairScheduler { Resources.createResource(2 * 1024)); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); + + // First verify we are adding containers to preemption list for the application + assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(), + scheduler.applications.get(app3).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(), + scheduler.applications.get(app6).getPreemptionContainers())); + + // Pretend 15 seconds have passed + clock.tick(15); + + // Trigger a kill by insisting we want containers back + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), + Resources.createResource(2 * 1024)); + + // At this point the containers should have been killed (since we are not simulating AM) assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + + // Trigger a kill by insisting we want containers back + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), + Resources.createResource(2 * 1024)); + + // Pretend 15 seconds have passed + clock.tick(15); // We should be able to claw back another container from A and B each. // Make sure it is lowest priority container. scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); @@ -1245,6 +1285,7 @@ public class TestFairScheduler { scheduler.handle(updateEvent); assertEquals(1, app.getLiveContainers().size()); + assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); // Create request at higher priority createSchedulingRequestExistingApplication(1024, 1, attId); @@ -1260,6 +1301,7 @@ public class TestFairScheduler { // Complete container scheduler.allocate(attId, new ArrayList<ResourceRequest>(), Arrays.asList(containerId)); + assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); // Schedule at opening scheduler.update(); @@ -1271,6 +1313,7 @@ public class TestFairScheduler { for (RMContainer liveContainer : liveContainers) { Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority()); } + assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); } @Test @@ -1382,6 +1425,37 @@ public class TestFairScheduler { assertEquals(1, app2.getLiveContainers().size()); } + @Test(timeout = 3000) + public void testMaxAssign() throws AllocationConfigurationException { + // set required scheduler configs + scheduler.assignMultiple = true; + scheduler.getQueueManager().getLeafQueue("root.default") + .setPolicy(SchedulingPolicy.getDefault()); + + RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(16384)); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + + ApplicationAttemptId attId = + createSchedulingRequest(1024, "root.default", "user", 8); + FSSchedulerApp app = scheduler.applications.get(attId); + + // set maxAssign to 2: only 2 containers should be allocated + scheduler.maxAssign = 2; + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Incorrect number of containers allocated", 2, app + .getLiveContainers().size()); + + // set maxAssign to -1: all remaining containers should be allocated + scheduler.maxAssign = -1; + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Incorrect number of containers allocated", 8, app + .getLiveContainers().size()); + } + /** * Test to verify the behavior of * {@link FSQueue#assignContainer(FSSchedulerNode)}) @@ -1544,4 +1618,24 @@ public class TestFairScheduler { assertEquals(1, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); } + + @Test + public void testRemoveNodeUpdatesRootQueueMetrics() { + assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); + + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); + NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1); + scheduler.handle(addEvent); + + assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); + scheduler.update(); // update shouldn't change things + assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); + + NodeRemovedSchedulerEvent removeEvent = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeEvent); + + assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); + scheduler.update(); // update shouldn't change things + assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java Thu May 9 22:46:39 2013 @@ -485,7 +485,8 @@ public class TestContainerManagerSecurit } private Container requestAndGetContainer(AMRMProtocol scheduler, - ApplicationId appID) throws YarnRemoteException, InterruptedException { + ApplicationId appID) throws YarnRemoteException, InterruptedException, + IOException { // Request a container allocation. List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java Thu May 9 22:46:39 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.webproxy; +import java.io.IOException; import java.net.InetSocketAddress; import org.apache.commons.logging.Log; @@ -78,9 +79,10 @@ public class AppReportFetcher { * @param appId the id of the application to get. * @return the ApplicationReport for that app. * @throws YarnRemoteException on any error. + * @throws IOException */ public ApplicationReport getApplicationReport(ApplicationId appId) - throws YarnRemoteException { + throws YarnRemoteException, IOException { GetApplicationReportRequest request = recordFactory .newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(appId);