Author: acmurthy
Date: Mon May 20 21:08:36 2013
New Revision: 1484597

URL: http://svn.apache.org/r1484597
Log:
Merge -c 1484596 from branch-1 to branch-1.2 to fix MAPREDUCE-3859. Fix 
CapacityScheduler to correctly compute runtime queue limits for high-ram jobs. 
Contributed by Sergey Tryuber.

Modified:
    hadoop/common/branches/branch-1.2/CHANGES.txt
    
hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
    
hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/common/branches/branch-1.2/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1484597&r1=1484596&r2=1484597&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.2/CHANGES.txt Mon May 20 21:08:36 2013
@@ -17,6 +17,9 @@ Release 1.2.1 - Unreleased 
     available for previous installs by putting it in hadoop-core.jar.
     (acmurthy)
 
+    MAPREDUCE-3859. Fix CapacityScheduler to correctly compute runtime queue
+    limits for high-ram jobs. (Sergey Tryuber via acmurthy)
+
 Release 1.2.0 - 2013.05.05
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1484597&r1=1484596&r2=1484597&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
 (original)
+++ 
hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
 Mon May 20 21:08:36 2013
@@ -1157,7 +1157,7 @@ class CapacitySchedulerQueue {
     
     int queueSlotsOccupied = getNumSlotsOccupied(taskType);
     int currentCapacity;
-    if (queueSlotsOccupied < queueCapacity) {
+    if (queueSlotsOccupied + numSlotsRequested <= queueCapacity) {
       currentCapacity = queueCapacity;
     }
     else {

Modified: 
hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1484597&r1=1484596&r2=1484597&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
 Mon May 20 21:08:36 2013
@@ -702,12 +702,18 @@ public class TestCapacityScheduler exten
     float capacity;
     boolean supportsPrio;
     int ulMin;
+    Float ulFactor;
 
     public FakeQueueInfo(String queueName, float capacity, boolean 
supportsPrio, int ulMin) {
+      this(queueName, capacity, supportsPrio, ulMin, null);
+    }
+
+    public FakeQueueInfo(String queueName, float capacity, boolean 
supportsPrio, int ulMin, Float ulFactor) {
       this.queueName = queueName;
       this.capacity = capacity;
       this.supportsPrio = supportsPrio;
       this.ulMin = ulMin;
+      this.ulFactor = ulFactor;
     }
   }
   
@@ -733,18 +739,29 @@ public class TestCapacityScheduler exten
     /*public synchronized String getFirstQueue() {
       return firstQueue;
     }*/
-    
+
+    @Override
     public float getCapacity(String queue) {
       if(queueMap.get(queue).capacity == -1) {
         return super.getCapacity(queue);
       }
       return queueMap.get(queue).capacity;
     }
-    
+
+    @Override
     public int getMinimumUserLimitPercent(String queue) {
       return queueMap.get(queue).ulMin;
     }
-    
+
+    @Override
+    public float getUserLimitFactor(String queue) {
+      if(queueMap.get(queue).ulFactor != null) {
+        return queueMap.get(queue).ulFactor;
+      }
+      return super.getUserLimitFactor(queue);
+    }
+
+    @Override
     public boolean isPrioritySupported(String queue) {
       return queueMap.get(queue).supportsPrio;
     }
@@ -1333,6 +1350,88 @@ public class TestCapacityScheduler exten
   }
 
   /**
+   * Test checks that high memory job is able to consume more slots then
+   * queue's configured capacity, but not more then max capacity.
+   * (of course, if user-limit-factor was set up properly)
+   */
+  public void testHighMemoryCanConsumeMaxCapacity() throws IOException {
+    //cluster with 20 map and 20 reduce slots
+    final int NUM_MAP_SLOTS = 4;
+    final int NUM_REDUCE_SLOTS = 4;
+    final int NUM_TASK_TRACKERS = 5;
+
+    taskTrackerManager =
+      new FakeTaskTrackerManager(NUM_TASK_TRACKERS, NUM_MAP_SLOTS, 
NUM_REDUCE_SLOTS);
+
+    //Q1 capacity is 4*5*0.5=10 map and 4*5*0.5=10 reduce slots
+    final String Q1 = "q1";
+    final float Q1_CAP = 50.f;
+    final int Q1_ULMIN = 50;
+    final float Q1_ULFACTOR = 2;
+
+    //Q2 just to fill sum capacity up to 100%
+    final String Q2 = "q2";
+    final float Q2_CAP = 50.f;
+    final int Q2_ULMIN = 50;
+
+    taskTrackerManager.addQueues(new String[] { Q1, Q2 });
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+
+
+    queues.add(new FakeQueueInfo(Q1, Q1_CAP, true, Q1_ULMIN, Q1_ULFACTOR));
+    queues.add(new FakeQueueInfo(Q2, Q2_CAP, true, Q2_ULMIN));
+    resConf.setFakeQueues(queues);
+
+    //q1 can go up to 4*5*0.8=16 map and 4*5*0.8=16 reduce slots
+    resConf.setMaxCapacity(Q1, 80.0f);
+
+    //configure and start scheduler
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        4 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        4 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    //submit high mem job with 5 mappers and 1 reducer with 4 slots each
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(4 * 1024);
+    jConf.setMemoryForReduceTask(4 * 1024);
+    jConf.setNumMapTasks(5);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName(Q1);
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    //tt1-tt4 are full (max capacity of q1 is 16 slots)
+    List<Task> tasks = checkAssignments("tt1",
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"});
+    List<Task> tasks2 = checkAssignments("tt2",
+        new String[] {"attempt_test_0001_m_000002_0 on tt2"});
+    List<Task> tasks3 = checkAssignments("tt3",
+            new String[] {"attempt_test_0001_m_000003_0 on tt3"});
+    List<Task> tasks4 = checkAssignments("tt4",
+            new String[] {"attempt_test_0001_m_000004_0 on tt4"});
+
+    assertTrue("Shouldn't assign more slots (reached max capacity)",
+        scheduler.assignTasks(tracker("tt5")).isEmpty());
+
+    checkOccupiedSlots(Q1, TaskType.MAP, 1, 16, 160.0f, 1, 0);
+    checkOccupiedSlots(Q1, TaskType.REDUCE, 1, 4, 40.0f, 0, 2);
+
+    //don't check 5th map task completeness. That's not this test case.
+  }
+
+  /**
    * Creates a queue with max capacity  of 50%
    * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
    * given to high ram job and are reserved , no other tasks are accepted .


Reply via email to