Author: shv
Date: Fri Jun 29 19:05:45 2012
New Revision: 1355514

URL: http://svn.apache.org/viewvc?rev=1355514&view=rev
Log:
MAPREDUCE-4360. Capacity Scheduler Hierarchical leaf queue does not honor the 
max capacity of container queue. Contributed by Mayank Bansal.

Modified:
    hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt
    
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
    
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java
    
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java

Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt?rev=1355514&r1=1355513&r2=1355514&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt Fri Jun 29 
19:05:45 2012
@@ -83,6 +83,9 @@ Release 0.22.1 - Unreleased
     MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files
     get deleted from task tracker. (Mayank Bansal via shv)
 
+    MAPREDUCE-4360. Capacity Scheduler Hierarchical leaf queue does not honor
+    the max capacity of container queue. (Mayank Bansal via shv)
+
 Release 0.22.0 - 2011-11-29
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1355514&r1=1355513&r2=1355514&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
 Fri Jun 29 19:05:45 2012
@@ -540,11 +540,31 @@ class CapacityTaskScheduler extends Task
           }
           return true;
         }
+      } else {
+        QueueSchedulingContext qscParent = qsc.getParentQSC();
+        while (qscParent != null) {
+          if (getTSC(qscParent).getMaxCapacity() < 0) {
+            qscParent = qscParent.getParentQSC();
+          } else {
+            break;
+          }
+        }
+        if (qscParent != null) {
+          TaskSchedulingContext tsiParent = getTSC(qscParent);
+          if ((tsiParent.getNumSlotsOccupied() + noOfSlotsPerTask) > tsiParent
+              .getMaxCapacity()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Queue " + qscParent.getQueueName() + " "
+                  + "has reached its  max " + type + "Capacity");
+              LOG.debug("Current running tasks " + tsiParent.getCapacity());
+            }
+            return true;
+          }
+        }
       }
       return false;
     }
 
-
     // for debugging.
     private void printQSCs() {
       if (LOG.isDebugEnabled()) {

Modified: 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java?rev=1355514&r1=1355513&r2=1355514&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
 (original)
+++ 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
 Fri Jun 29 19:05:45 2012
@@ -102,9 +102,8 @@ class QueueHierarchyBuilder {
         if (childQueues != null && childQueues.size() > 0) {
           //generate a new ContainerQueue and recursively
           //create hierarchy.
-          AbstractQueue cq =
-              new ContainerQueue(parent, loadContext(qs.getProperties(),
-                  qs.getQueueName(), schedConfig));
+          AbstractQueue cq = new ContainerQueue(parent, loadContext(
+              qs.getProperties(), qs.getQueueName(), schedConfig, parent));
           //update totalCapacity
           totalCapacity += cq.qsc.getCapacityPercent();
           LOG.info("Created a ContainerQueue " + qs.getQueueName()
@@ -115,9 +114,8 @@ class QueueHierarchyBuilder {
           //if not this is a JobQueue.
 
           //create a JobQueue.
-          AbstractQueue jq =
-              new JobQueue(parent, loadContext(qs.getProperties(),
-                  qs.getQueueName(), schedConfig));
+          AbstractQueue jq = new JobQueue(parent, loadContext(
+              qs.getProperties(), qs.getQueueName(), schedConfig, parent));
           totalCapacity += jq.qsc.getCapacityPercent();
           LOG.info("Created a jobQueue " + qs.getQueueName()
               + " and added it as a child to " + parent.getName());
@@ -151,7 +149,7 @@ class QueueHierarchyBuilder {
    * @return the generated {@link QueueSchedulingContext} object
    */
   private QueueSchedulingContext loadContext(Properties props,
-      String queueName, CapacitySchedulerConf schedConf) {
+      String queueName, CapacitySchedulerConf schedConf, AbstractQueue parent) 
{
     schedConf.setProperties(queueName,props);
     float capacity = schedConf.getCapacity(queueName);
     float stretchCapacity = schedConf.getMaxCapacity(queueName);
@@ -160,8 +158,8 @@ class QueueHierarchyBuilder {
     }
     int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
     // create our QSC and add to our hashmap
-    QueueSchedulingContext qsi = new QueueSchedulingContext(
-      queueName, capacity, stretchCapacity, ulMin
+    QueueSchedulingContext qsi = new QueueSchedulingContext(queueName,
+        capacity, stretchCapacity, ulMin, parent.qsc
     );
     qsi.setSupportsPriorities(
       schedConf.isPrioritySupported(
@@ -178,7 +176,7 @@ class QueueHierarchyBuilder {
    */
   static AbstractQueue createRootAbstractQueue() {
     QueueSchedulingContext rootContext =
-        new QueueSchedulingContext("", 100, -1, -1);
+        new QueueSchedulingContext("", 100, -1, -1, null);
     AbstractQueue root = new ContainerQueue(null, rootContext);
     return root;
   }

Modified: 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java?rev=1355514&r1=1355513&r2=1355514&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java
 (original)
+++ 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java
 Fri Jun 29 19:05:45 2012
@@ -97,6 +97,12 @@ public class QueueSchedulingContext {
      */
     private TaskSchedulingContext mapTSC;
     private TaskSchedulingContext reduceTSC;
+    
+  /**
+   * For Hierarchical queues we need to have the parent context for the child
+   * queue.
+   */
+  private QueueSchedulingContext parentQSC;
 
   QueueSchedulingContext(
     String queueName, float capacityPercent, float maxCapacityPercent,
@@ -108,7 +114,18 @@ public class QueueSchedulingContext {
     this.setMapTSC(new TaskSchedulingContext());
     this.setReduceTSC(new TaskSchedulingContext());
   }
-
+  
+  QueueSchedulingContext(String queueName, float capacityPercent,
+      float maxCapacityPercent, int ulMin, QueueSchedulingContext parentQSC) {
+    this.setQueueName(queueName);
+    this.setCapacityPercent(capacityPercent);
+    this.setMaxCapacityPercent(maxCapacityPercent);
+    this.setUlMin(ulMin);
+    this.setMapTSC(new TaskSchedulingContext());
+    this.setReduceTSC(new TaskSchedulingContext());
+    this.setParentQSC(parentQSC);
+  }
+  
   /**
      * return information about the queue
      *
@@ -282,4 +299,23 @@ public class QueueSchedulingContext {
     prevMapCapacity = getMapCapacity();
     prevReduceCapacity = getReduceCapacity();
   }
+
+  /**
+   * This method returns the parent QSC object.
+   * 
+   * @return parent QSC for this queue
+   */
+  public QueueSchedulingContext getParentQSC() {
+    return parentQSC;
+  }
+
+  /**
+   * Setting the parent QSC object for this queue.
+   * 
+   * @param parentQSC
+   *          Setting the parent QSC object
+   */
+  public void setParentQSC(QueueSchedulingContext parentQSC) {
+    this.parentQSC = parentQSC;
+  }
 }

Modified: 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java?rev=1355514&r1=1355513&r2=1355514&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
 (original)
+++ 
hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
 Fri Jun 29 19:05:45 2012
@@ -23,6 +23,10 @@ import java.util.List;
 import java.util.HashMap;
 import java.util.Map;
 import java.io.IOException;
+
+import org.apache.hadoop.mapred.CapacityTestUtils.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
 import static org.apache.hadoop.mapred.CapacityTestUtils.*;
 
 public class TestContainerQueue extends TestCase {
@@ -236,10 +240,10 @@ public class TestContainerQueue extends 
     // Create 2 levels of hierarchy.
 
     //Firt level
-    QueueSchedulingContext sch =
-      new QueueSchedulingContext("rt.sch", a, -1, -1);
-    QueueSchedulingContext gta =
-      new QueueSchedulingContext("rt.gta", b, -1, -1);
+    QueueSchedulingContext sch = new QueueSchedulingContext("rt.sch", a, -1,
+        -1, rt.qsc);
+    QueueSchedulingContext gta = new QueueSchedulingContext("rt.gta", b, -1,
+        -1, rt.qsc);
 
     AbstractQueue schq = new ContainerQueue(rt, sch);
 
@@ -249,11 +253,11 @@ public class TestContainerQueue extends 
     map.put(gtaq.getName(), gtaq);
     scheduler.jobQueuesManager.addQueue((JobQueue) gtaq);
 
-    //Create further children.
-    QueueSchedulingContext prod =
-      new QueueSchedulingContext("rt.sch.prod", c, -1, -1);
-    QueueSchedulingContext misc =
-      new QueueSchedulingContext("rt.sch.misc", d, -1, -1);
+    // Create further children.
+    QueueSchedulingContext prod = new QueueSchedulingContext("rt.sch.prod", c,
+        -1, -1, sch);
+    QueueSchedulingContext misc = new QueueSchedulingContext("rt.sch.misc", d,
+        -1, -1, sch);
 
     AbstractQueue prodq = new JobQueue(schq, prod);
     AbstractQueue miscq = new JobQueue(schq, misc);
@@ -265,6 +269,101 @@ public class TestContainerQueue extends 
     return map;
   }
 
+  public void testMaxCapacityContainerQueuehonuredInchildqueue()
+      throws IOException {
+    this.setUp(8, 1, 1);
+    taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
+    // set up some queues
+    Map<String, AbstractQueue> map = setUpHierarchy(50, 50, 50, 50);
+    scheduler.updateContextInfoForTests();
+    // verify initial capacity distribution
+    TaskSchedulingContext mapTsc = map.get("rt.gta")
+        .getQueueSchedulingContext().getMapTSC();
+    assertEquals(mapTsc.getCapacity(), 4);
+
+    mapTsc = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
+    assertEquals(mapTsc.getCapacity(), 4);
+
+    mapTsc = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
+    assertEquals(mapTsc.getCapacity(), 2);
+
+    mapTsc = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
+    assertEquals(mapTsc.getCapacity(), 2);
+
+    assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
+        "rt.sch.misc" }, new int[] { 0, 0, 0, 0 });
+
+    map.get("rt.sch").getQueueSchedulingContext().setMaxCapacityPercent(50.0f);
+    
map.get("rt.sch").getQueueSchedulingContext().getMapTSC().setMaxCapacity(4);
+    map.get("rt.sch").getQueueSchedulingContext().getReduceTSC()
+        .setMaxCapacity(4);
+
+    // Only Allow job submission to leaf queue
+    FakeJobInProgress fjob1 = taskTrackerManager.submitJob(JobStatus.PREP, 5,
+        5, "rt.sch.prod", "u1");
+    taskTrackerManager.initJob(fjob1);
+
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.clear();
+    expectedStrings.put(CapacityTestUtils.MAP,
+        "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(CapacityTestUtils.REDUCE,
+        "attempt_test_0001_r_000001_0 on tt1");
+    List<Task> task1 = checkMultipleTaskAssignment(taskTrackerManager,
+        scheduler, "tt1", expectedStrings);
+    assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
+        "rt.sch.misc" }, new int[] { 0, 1, 1, 0 });
+
+    expectedStrings.clear();
+    expectedStrings.put(CapacityTestUtils.MAP,
+        "attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(CapacityTestUtils.REDUCE,
+        "attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt2",
+        expectedStrings);
+    assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
+        "rt.sch.misc" }, new int[] { 0, 2, 2, 0 });
+
+    expectedStrings.clear();
+    expectedStrings.put(CapacityTestUtils.MAP,
+        "attempt_test_0001_m_000003_0 on tt3");
+    expectedStrings.put(CapacityTestUtils.REDUCE,
+        "attempt_test_0001_r_000003_0 on tt3");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt3",
+        expectedStrings);
+    assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
+        "rt.sch.misc" }, new int[] { 0, 3, 3, 0 });
+
+    expectedStrings.clear();
+    expectedStrings.put(CapacityTestUtils.MAP,
+        "attempt_test_0001_m_000004_0 on tt4");
+    expectedStrings.put(CapacityTestUtils.REDUCE,
+        "attempt_test_0001_r_000004_0 on tt4");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt4",
+        expectedStrings);
+    assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
+        "rt.sch.misc" }, new int[] { 0, 4, 4, 0 });
+
+    // we have already reached the limit
+    // this call would return null
+    List<Task> task5 = scheduler.assignTasks(tracker("tt5"));
+    assertNull(task5);
+
+    // Now complete the task 1 i.e map task.
+    for (Task task : task1) {
+      taskTrackerManager.finishTask(task.getTaskID().toString(), fjob1);
+    }
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000005_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000005_0 on tt1");
+    task5 = checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1",
+        expectedStrings);
+  }
+  
+  protected TaskTracker tracker(String taskTrackerName) {
+           return taskTrackerManager.getTaskTracker(taskTrackerName);
+         }
+  
   /**
    * Verifies that capacities are allocated properly in hierarchical queues.
    * 


Reply via email to