Author: acmurthy
Date: Sat Jul 20 14:59:41 2013
New Revision: 1505146

URL: http://svn.apache.org/r1505146
Log:
YARN-897. Ensure child queues are ordered correctly to account for completed 
containers. Contributed by Djellel Eddine Difallah.

Added:
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
   (with props)
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1505146&r1=1505145&r2=1505146&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Jul 20 14:59:41 2013
@@ -796,6 +796,9 @@ Release 2.1.0-beta - 2013-07-02
     YARN-814. Improving diagnostics when containers fail during launch due to
     various reasons like invalid env etc. (Jian He via vinodkv)
 
+    YARN-897. Ensure child queues are ordered correctly to account for
+    completed containers. (Djellel Eddine Difallah via acmurthy)
+
 Release 2.0.5-alpha - 06/06/2013
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1505146&r1=1505145&r2=1505146&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
 Sat Jul 20 14:59:41 2013
@@ -185,12 +185,13 @@ extends org.apache.hadoop.yarn.server.re
    *                  <code>null</code> if it was just a reservation
    * @param containerStatus <code>ContainerStatus</code> for the completed 
    *                        container
+   * @param childQueue <code>CSQueue</code> to reinsert in childQueues 
    * @param event event to be sent to the container
    */
   public void completedContainer(Resource clusterResource,
       FiCaSchedulerApp application, FiCaSchedulerNode node, 
       RMContainer container, ContainerStatus containerStatus, 
-      RMContainerEventType event);
+      RMContainerEventType event, CSQueue childQueue);
 
   /**
    * Get the number of applications in the queue.

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1505146&r1=1505145&r2=1505146&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 Sat Jul 20 14:59:41 2013
@@ -673,7 +673,7 @@ public class CapacityScheduler
           SchedulerUtils.createAbnormalContainerStatus(
               container.getId(), 
               SchedulerUtils.UNRESERVED_CONTAINER), 
-          RMContainerEventType.RELEASED);
+          RMContainerEventType.RELEASED, null);
       }
 
     }
@@ -828,7 +828,7 @@ public class CapacityScheduler
     // Inform the queue
     LeafQueue queue = (LeafQueue)application.getQueue();
     queue.completedContainer(clusterResource, application, node, 
-        rmContainer, containerStatus, event);
+        rmContainer, containerStatus, event, null);
 
     LOG.info("Application " + applicationAttemptId + 
         " released container " + container.getId() +

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1505146&r1=1505145&r2=1505146&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 Sat Jul 20 14:59:41 2013
@@ -1407,7 +1407,7 @@ public class LeafQueue implements CSQueu
   @Override
   public void completedContainer(Resource clusterResource, 
       FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer 
rmContainer, 
-      ContainerStatus containerStatus, RMContainerEventType event) {
+      ContainerStatus containerStatus, RMContainerEventType event, CSQueue 
childQueue) {
     if (application != null) {
       // Careful! Locking order is important!
       synchronized (this) {
@@ -1442,7 +1442,7 @@ public class LeafQueue implements CSQueu
               " cluster=" + clusterResource);
           // Inform the parent queue
           getParent().completedContainer(clusterResource, application,
-              node, rmContainer, null, event);
+              node, rmContainer, null, event, this);
         }
       }
 

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1505146&r1=1505145&r2=1505146&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 Sat Jul 20 14:59:41 2013
@@ -655,7 +655,7 @@ public class ParentQueue implements CSQu
               assignment.getResource(), Resources.none())) {
         // Remove and re-insert to sort
         iter.remove();
-        LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() 
+ 
+        LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() + 
             " stats: " + childQueue);
         childQueues.add(childQueue);
         if (LOG.isDebugEnabled()) {
@@ -685,7 +685,8 @@ public class ParentQueue implements CSQu
   @Override
   public void completedContainer(Resource clusterResource,
       FiCaSchedulerApp application, FiCaSchedulerNode node, 
-      RMContainer rmContainer, ContainerStatus containerStatus, 
RMContainerEventType event) {
+      RMContainer rmContainer, ContainerStatus containerStatus, 
+      RMContainerEventType event, CSQueue completedChildQueue) {
     if (application != null) {
       // Careful! Locking order is important!
       // Book keeping
@@ -701,10 +702,24 @@ public class ParentQueue implements CSQu
             " cluster=" + clusterResource);
       }
 
+      // reinsert the updated queue
+      for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
+        CSQueue csqueue = iter.next();
+        if(csqueue.equals(completedChildQueue))
+        {
+          iter.remove();
+          LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() + 
+              " stats: " + csqueue);
+          childQueues.add(csqueue);
+          break;
+        }
+      }
+      
       // Inform the parent
       if (parent != null) {
+        // complete my parent
         parent.completedContainer(clusterResource, application, 
-            node, rmContainer, null, event);
+            node, rmContainer, null, event, this);
       }    
     }
   }

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java?rev=1505146&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
 Sat Jul 20 14:59:41 2013
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestChildQueueOrder {
+
+  private static final Log LOG = LogFactory.getLog(TestChildQueueOrder.class);
+
+  RMContext rmContext;
+  YarnConfiguration conf;
+  CapacitySchedulerConfiguration csConf;
+  CapacitySchedulerContext csContext;
+
+  final static int GB = 1024;
+  final static String DEFAULT_RACK = "/default";
+
+  private final ResourceCalculator resourceComparator =
+    new DefaultResourceCalculator();
+
+  @Before
+  public void setUp() throws Exception {
+    rmContext = TestUtils.getMockRMContext();
+    conf = new YarnConfiguration();
+    csConf = new CapacitySchedulerConfiguration();
+
+    csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConf()).thenReturn(conf);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getMinimumResourceCapability()).thenReturn(
+        Resources.createResource(GB, 1));
+    when(csContext.getMaximumResourceCapability()).thenReturn(
+        Resources.createResource(16*GB, 32));
+    when(csContext.getClusterResources()).
+    thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
+    when(csContext.getApplicationComparator()).
+    thenReturn(CapacityScheduler.applicationComparator);
+    when(csContext.getQueueComparator()).
+    thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getResourceCalculator()).
+    thenReturn(resourceComparator);
+  }
+
+  private FiCaSchedulerApp getMockApplication(int appId, String user) {
+    FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
+    doReturn(user).when(application).getUser();
+    doReturn(Resources.createResource(0, 0)).when(application).getHeadroom();
+    return application;
+  }
+
+  private void stubQueueAllocation(final CSQueue queue, 
+      final Resource clusterResource, final FiCaSchedulerNode node, 
+      final int allocation) {
+    stubQueueAllocation(queue, clusterResource, node, allocation, 
+        NodeType.NODE_LOCAL);
+  }
+
+  private void stubQueueAllocation(final CSQueue queue, 
+      final Resource clusterResource, final FiCaSchedulerNode node, 
+      final int allocation, final NodeType type) {
+
+    // Simulate the queue allocation
+    doAnswer(new Answer<CSAssignment>() {
+      @Override
+      public CSAssignment answer(InvocationOnMock invocation) throws Throwable 
{
+        try {
+          throw new Exception();
+        } catch (Exception e) {
+          LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() + 
+              " alloc=" + allocation + " node=" + node.getHostName());
+        }
+        final Resource allocatedResource = 
Resources.createResource(allocation);
+        if (queue instanceof ParentQueue) {
+          ((ParentQueue)queue).allocateResource(clusterResource, 
+              allocatedResource);
+        } else {
+          FiCaSchedulerApp app1 = getMockApplication(0, "");
+          ((LeafQueue)queue).allocateResource(clusterResource, app1, 
+              allocatedResource);
+        }
+
+        // Next call - nothing
+        if (allocation > 0) {
+          doReturn(new CSAssignment(Resources.none(), type)).
+          when(queue).assignContainers(eq(clusterResource), eq(node));
+
+          // Mock the node's resource availability
+          Resource available = node.getAvailableResource();
+          doReturn(Resources.subtractFrom(available, allocatedResource)).
+          when(node).getAvailableResource();
+        }
+
+        return new CSAssignment(allocatedResource, type);
+      }
+    }).
+    when(queue).assignContainers(eq(clusterResource), eq(node));
+    doNothing().when(node).releaseContainer(any(Container.class));
+  }
+
+
+  private float computeQueueAbsoluteUsedCapacity(CSQueue queue, 
+      int expectedMemory, Resource clusterResource) {
+    return (
+        ((float)expectedMemory / (float)clusterResource.getMemory())
+      );
+  }
+
+  private float computeQueueUsedCapacity(CSQueue queue,
+      int expectedMemory, Resource clusterResource) {
+    return (expectedMemory / 
+        (clusterResource.getMemory() * queue.getAbsoluteCapacity()));
+  }
+
+  final static float DELTA = 0.0001f;
+  private void verifyQueueMetrics(CSQueue queue, 
+      int expectedMemory, Resource clusterResource) {
+    assertEquals(
+        computeQueueAbsoluteUsedCapacity(queue, expectedMemory, 
clusterResource), 
+        queue.getAbsoluteUsedCapacity(), 
+        DELTA);
+    assertEquals(
+        computeQueueUsedCapacity(queue, expectedMemory, clusterResource), 
+        queue.getUsedCapacity(), 
+        DELTA);
+
+  }
+
+  private static final String A = "a";
+  private static final String B = "b";
+  private static final String C = "c";
+  private static final String D = "d";
+
+  private void setupSortedQueues(CapacitySchedulerConfiguration conf) {
+
+    // Define queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, 
C, D});
+
+    final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
+    conf.setCapacity(Q_A, 25);
+
+    final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
+    conf.setCapacity(Q_B, 25);
+
+    final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
+    conf.setCapacity(Q_C, 25);
+
+    final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
+    conf.setCapacity(Q_D, 25);
+  }
+
+  @Test
+  public void testSortedQueues() throws Exception {
+    // Setup queue configs
+    setupSortedQueues(csConf);
+    Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
+    CSQueue root = 
+      CapacityScheduler.parseQueue(csContext, csConf, null, 
+          CapacitySchedulerConfiguration.ROOT, queues, queues, 
+          TestUtils.spyHook);
+
+    // Setup some nodes
+    final int memoryPerNode = 10;
+    final int coresPerNode = 16;
+    final int numNodes = 1;
+
+    FiCaSchedulerNode node_0 = 
+      TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
+    doNothing().when(node_0).releaseContainer(any(Container.class));
+    
+    final Resource clusterResource = 
+      Resources.createResource(numNodes * (memoryPerNode*GB), 
+          numNodes * coresPerNode);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Start testing
+    CSQueue a = queues.get(A);
+    CSQueue b = queues.get(B);
+    CSQueue c = queues.get(C);
+    CSQueue d = queues.get(D);
+
+    final String user_0 = "user_0";
+
+    // Stub an App and its containerCompleted
+    FiCaSchedulerApp app_0 = getMockApplication(0,user_0);
+    doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
+        any(ContainerStatus.class),any(RMContainerEventType.class));
+
+    //
+    Priority priority = TestUtils.createMockPriority(1); 
+    ContainerAllocationExpirer expirer = 
+      mock(ContainerAllocationExpirer.class);
+    DrainDispatcher drainDispatcher = new DrainDispatcher();
+    EventHandler eventHandler = drainDispatcher.getEventHandler();
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        app_0.getApplicationId(), 1);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+    Container container=TestUtils.getMockContainer(containerId, 
+        node_0.getNodeID(), Resources.createResource(1*GB), priority);
+    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+        node_0.getNodeID(), eventHandler, expirer);
+
+    // Assign {1,2,3,4} 1GB containers respectively to queues
+    stubQueueAllocation(a, clusterResource, node_0, 1*GB);
+    stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+    stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+    stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+    root.assignContainers(clusterResource, node_0);
+    for(int i=0; i < 2; i++)
+    {
+      stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(b, clusterResource, node_0, 1*GB);
+      stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+      root.assignContainers(clusterResource, node_0);
+    } 
+    for(int i=0; i < 3; i++)
+    {
+      stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(c, clusterResource, node_0, 1*GB);
+      stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+      root.assignContainers(clusterResource, node_0);
+    }  
+    for(int i=0; i < 4; i++)
+    {
+      stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(d, clusterResource, node_0, 1*GB);
+      root.assignContainers(clusterResource, node_0);
+    }    
+    verifyQueueMetrics(a, 1*GB, clusterResource);
+    verifyQueueMetrics(b, 2*GB, clusterResource);
+    verifyQueueMetrics(c, 3*GB, clusterResource);
+    verifyQueueMetrics(d, 4*GB, clusterResource);
+    LOG.info("status child-queues: " + ((ParentQueue)root).
+        getChildQueuesToPrint());
+
+    //Release 3 x 1GB containers from D
+    for(int i=0; i < 3;i++)
+    {
+      d.completedContainer(clusterResource, app_0, node_0,
+          rmContainer, null, RMContainerEventType.KILL, null);
+    }
+    verifyQueueMetrics(a, 1*GB, clusterResource);
+    verifyQueueMetrics(b, 2*GB, clusterResource);
+    verifyQueueMetrics(c, 3*GB, clusterResource);
+    verifyQueueMetrics(d, 1*GB, clusterResource);
+    //reset manually resources on node
+    node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0,
+        (memoryPerNode-1-2-3-1)*GB);
+    LOG.info("status child-queues: " + 
+        ((ParentQueue)root).getChildQueuesToPrint());
+
+
+    // Assign 2 x 1GB Containers to A 
+    for(int i=0; i < 2; i++)
+    {
+      stubQueueAllocation(a, clusterResource, node_0, 1*GB);
+      stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+      stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+      root.assignContainers(clusterResource, node_0);
+    }
+    verifyQueueMetrics(a, 3*GB, clusterResource);
+    verifyQueueMetrics(b, 2*GB, clusterResource);
+    verifyQueueMetrics(c, 3*GB, clusterResource);
+    verifyQueueMetrics(d, 1*GB, clusterResource);
+    LOG.info("status child-queues: " + 
+        ((ParentQueue)root).getChildQueuesToPrint());
+
+    //Release 1GB Container from A
+    a.completedContainer(clusterResource, app_0, node_0, 
+        rmContainer, null, RMContainerEventType.KILL, null);
+    verifyQueueMetrics(a, 2*GB, clusterResource);
+    verifyQueueMetrics(b, 2*GB, clusterResource);
+    verifyQueueMetrics(c, 3*GB, clusterResource);
+    verifyQueueMetrics(d, 1*GB, clusterResource);
+    //reset manually resources on node
+    node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0,
+        (memoryPerNode-2-2-3-1)*GB);
+    LOG.info("status child-queues: " + 
+        ((ParentQueue)root).getChildQueuesToPrint());
+
+    // Assign 1GB container to B 
+    stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+    stubQueueAllocation(b, clusterResource, node_0, 1*GB);
+    stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+    stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+    root.assignContainers(clusterResource, node_0);
+    verifyQueueMetrics(a, 2*GB, clusterResource);
+    verifyQueueMetrics(b, 3*GB, clusterResource);
+    verifyQueueMetrics(c, 3*GB, clusterResource);
+    verifyQueueMetrics(d, 1*GB, clusterResource);
+    LOG.info("status child-queues: " + 
+        ((ParentQueue)root).getChildQueuesToPrint());
+
+    //Release 1GB container resources from B
+    b.completedContainer(clusterResource, app_0, node_0, 
+        rmContainer, null, RMContainerEventType.KILL, null);
+    verifyQueueMetrics(a, 2*GB, clusterResource);
+    verifyQueueMetrics(b, 2*GB, clusterResource);
+    verifyQueueMetrics(c, 3*GB, clusterResource);
+    verifyQueueMetrics(d, 1*GB, clusterResource);
+    //reset manually resources on node
+    node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, 
+        (memoryPerNode-2-2-3-1)*GB);
+    LOG.info("status child-queues: " + 
+        ((ParentQueue)root).getChildQueuesToPrint());
+
+    // Assign 1GB container to A
+    stubQueueAllocation(a, clusterResource, node_0, 1*GB);
+    stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+    stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+    stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+    root.assignContainers(clusterResource, node_0);
+    verifyQueueMetrics(a, 3*GB, clusterResource);
+    verifyQueueMetrics(b, 2*GB, clusterResource);
+    verifyQueueMetrics(c, 3*GB, clusterResource);
+    verifyQueueMetrics(d, 1*GB, clusterResource);
+    LOG.info("status child-queues: " + 
+        ((ParentQueue)root).getChildQueuesToPrint());
+
+    // Now do the real test, where B and D request a 1GB container
+    // D should should get the next container if the order is correct
+    stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+    stubQueueAllocation(b, clusterResource, node_0, 1*GB);
+    stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+    stubQueueAllocation(d, clusterResource, node_0, 1*GB);
+    root.assignContainers(clusterResource, node_0);
+    InOrder allocationOrder = inOrder(d,b);
+    allocationOrder.verify(d).assignContainers(eq(clusterResource), 
+        any(FiCaSchedulerNode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
+        any(FiCaSchedulerNode.class));
+    verifyQueueMetrics(a, 3*GB, clusterResource);
+    verifyQueueMetrics(b, 2*GB, clusterResource);
+    verifyQueueMetrics(c, 3*GB, clusterResource);
+    verifyQueueMetrics(d, 2*GB, clusterResource); //D got the container
+    LOG.info("status child-queues: " + 
+        ((ParentQueue)root).getChildQueuesToPrint());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+}

Propchange: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1505146&r1=1505145&r2=1505146&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 Sat Jul 20 14:59:41 2013
@@ -227,7 +227,7 @@ public class TestLeafQueue {
     doNothing().when(parent).completedContainer(
         any(Resource.class), any(FiCaSchedulerApp.class), 
any(FiCaSchedulerNode.class), 
         any(RMContainer.class), any(ContainerStatus.class), 
-        any(RMContainerEventType.class));
+        any(RMContainerEventType.class), any(CSQueue.class));
     
     return queue;
   }
@@ -480,7 +480,7 @@ public class TestLeafQueue {
     // Release each container from app_0
     for (RMContainer rmContainer : app_0.getLiveContainers()) {
       a.completedContainer(clusterResource, app_0, node_0, rmContainer, 
-          null, RMContainerEventType.KILL);
+          null, RMContainerEventType.KILL, null);
     }
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -491,7 +491,7 @@ public class TestLeafQueue {
     // Release each container from app_1
     for (RMContainer rmContainer : app_1.getLiveContainers()) {
       a.completedContainer(clusterResource, app_1, node_0, rmContainer, 
-          null, RMContainerEventType.KILL);
+          null, RMContainerEventType.KILL, null);
     }
     assertEquals(0*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -850,7 +850,7 @@ public class TestLeafQueue {
     // 8. Release each container from app_0
     for (RMContainer rmContainer : app_0.getLiveContainers()) {
       a.completedContainer(clusterResource, app_0, node_0, rmContainer, 
-          null, RMContainerEventType.KILL);
+          null, RMContainerEventType.KILL, null);
     }
     assertEquals(5*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -861,7 +861,7 @@ public class TestLeafQueue {
     // 9. Release each container from app_2
     for (RMContainer rmContainer : app_2.getLiveContainers()) {
       a.completedContainer(clusterResource, app_2, node_0, rmContainer, 
-          null, RMContainerEventType.KILL);
+          null, RMContainerEventType.KILL, null);
     }
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -872,7 +872,7 @@ public class TestLeafQueue {
     // 10. Release each container from app_3
     for (RMContainer rmContainer : app_3.getLiveContainers()) {
       a.completedContainer(clusterResource, app_3, node_0, rmContainer, 
-          null, RMContainerEventType.KILL);
+          null, RMContainerEventType.KILL, null);
     }
     assertEquals(0*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -959,7 +959,8 @@ public class TestLeafQueue {
     
     // Now free 1 container from app_0 i.e. 1G
     a.completedContainer(clusterResource, app_0, node_0, 
-        app_0.getLiveContainers().iterator().next(), null, 
RMContainerEventType.KILL);
+        app_0.getLiveContainers().iterator().next(), 
+        null, RMContainerEventType.KILL, null);
     a.assignContainers(clusterResource, node_0);
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -971,7 +972,8 @@ public class TestLeafQueue {
 
     // Now finish another container from app_0 and fulfill the reservation
     a.completedContainer(clusterResource, app_0, node_0, 
-        app_0.getLiveContainers().iterator().next(), null, 
RMContainerEventType.KILL);
+        app_0.getLiveContainers().iterator().next(), 
+        null, RMContainerEventType.KILL, null);
     a.assignContainers(clusterResource, node_0);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1069,7 +1071,8 @@ public class TestLeafQueue {
 
     // Now free 1 container from app_0 and try to assign to node_0
     a.completedContainer(clusterResource, app_0, node_0,
-        app_0.getLiveContainers().iterator().next(), null, 
RMContainerEventType.KILL);
+        app_0.getLiveContainers().iterator().next(), 
+        null, RMContainerEventType.KILL, null);
     a.assignContainers(clusterResource, node_0);
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1160,7 +1163,8 @@ public class TestLeafQueue {
     
     // Now free 1 container from app_0 i.e. 1G, and re-reserve it
     a.completedContainer(clusterResource, app_0, node_0, 
-        app_0.getLiveContainers().iterator().next(), null, 
RMContainerEventType.KILL);
+        app_0.getLiveContainers().iterator().next(), 
+        null, RMContainerEventType.KILL, null);
     a.assignContainers(clusterResource, node_0);
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1191,7 +1195,8 @@ public class TestLeafQueue {
     
     // Now finish another container from app_0 and see the reservation 
cancelled
     a.completedContainer(clusterResource, app_0, node_0, 
-        app_0.getLiveContainers().iterator().next(), null, 
RMContainerEventType.KILL);
+        app_0.getLiveContainers().iterator().next(), 
+        null, RMContainerEventType.KILL, null);
     CSAssignment assignment = a.assignContainers(clusterResource, node_0);
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());


Reply via email to