YARN-3920. FairScheduler container reservation on a node should be configurable 
to limit it to large containers (adhoot via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/94dec5a9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/94dec5a9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/94dec5a9

Branch: refs/heads/HDFS-7966
Commit: 94dec5a9164cd9bc573fbf74e76bcff9e7c5c637
Parents: 602335d
Author: Arun Suresh <asur...@apache.org>
Authored: Fri Sep 18 14:00:49 2015 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Fri Sep 18 14:02:55 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../scheduler/fair/FSAppAttempt.java            | 19 ++++-
 .../scheduler/fair/FairScheduler.java           | 21 ++++-
 .../fair/FairSchedulerConfiguration.java        | 22 ++++-
 .../scheduler/fair/FairSchedulerTestBase.java   |  6 ++
 .../scheduler/fair/TestFairScheduler.java       | 89 +++++++++++++++++++-
 6 files changed, 148 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/94dec5a9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 822624f..7487f71 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -454,6 +454,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup
     failure during commitJob. (Junping Du via wangda)
 
+    YARN-3920. FairScheduler container reservation on a node should be
+    configurable to limit it to large containers (adhoot via asuresh)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94dec5a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index cfec915..7af1891 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -543,10 +543,23 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       return container.getResource();
     }
 
-    // The desired container won't fit here, so reserve
-    reserve(request.getPriority(), node, container, reserved);
+    if (isReservable(container)) {
+      // The desired container won't fit here, so reserve
+      reserve(request.getPriority(), node, container, reserved);
 
-    return FairScheduler.CONTAINER_RESERVED;
+      return FairScheduler.CONTAINER_RESERVED;
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not creating reservation as container " + container.getId()
+            + " is not reservable");
+      }
+      return Resources.none();
+    }
+  }
+
+  private boolean isReservable(Container container) {
+    return scheduler.isAtLeastReservationThreshold(
+      getQueue().getPolicy().getResourceCalculator(), container.getResource());
   }
 
   private boolean hasNodeOrRackLocalRequests(Priority priority) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94dec5a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3a39799..a083272 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -194,7 +194,11 @@ public class FairScheduler extends
   private AllocationFileLoaderService allocsLoader;
   @VisibleForTesting
   AllocationConfiguration allocConf;
-  
+
+  // Container size threshold for making a reservation.
+  @VisibleForTesting
+  Resource reservationThreshold;
+
   public FairScheduler() {
     super(FairScheduler.class.getName());
     clock = new SystemClock();
@@ -203,6 +207,12 @@ public class FairScheduler extends
     maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
   }
 
+  public boolean isAtLeastReservationThreshold(
+      ResourceCalculator resourceCalculator, Resource resource) {
+    return Resources.greaterThanOrEqual(
+        resourceCalculator, clusterResource, resource, reservationThreshold);
+  }
+
   private void validateConf(Configuration conf) {
     // validate scheduler memory allocation setting
     int minMem = conf.getInt(
@@ -1325,6 +1335,7 @@ public class FairScheduler extends
       minimumAllocation = this.conf.getMinimumAllocation();
       initMaximumResourceCapability(this.conf.getMaximumAllocation());
       incrAllocation = this.conf.getIncrementAllocation();
+      updateReservationThreshold();
       continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
       continuousSchedulingSleepMs =
           this.conf.getContinuousSchedulingSleepMs();
@@ -1391,6 +1402,14 @@ public class FairScheduler extends
     }
   }
 
+  private void updateReservationThreshold() {
+    Resource newThreshold = Resources.multiply(
+        getIncrementResourceCapability(),
+        this.conf.getReservationThresholdIncrementMultiple());
+
+    reservationThreshold = newThreshold;
+  }
+
   private synchronized void startSchedulerThreads() {
     Preconditions.checkNotNull(updateThread, "updateThread is null");
     Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94dec5a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index e477e6e..892484d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -49,7 +47,17 @@ public class FairSchedulerConfiguration extends 
Configuration {
   public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES =
     YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores";
   public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;
-  
+
+  /** Threshold for container size for making a container reservation as a
+   * multiple of increment allocation. Only container sizes above this are
+   * allowed to reserve a node */
+  public static final String
+      RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE =
+      YarnConfiguration.YARN_PREFIX +
+          "scheduler.reservation-threshold.increment-multiple";
+  public static final float
+      DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = 2f;
+
   private static final String CONF_PREFIX =  "yarn.scheduler.fair.";
 
   public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
@@ -166,7 +174,13 @@ public class FairSchedulerConfiguration extends 
Configuration {
       DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES);
     return Resources.createResource(incrementMemory, incrementCores);
   }
-  
+
+  public float getReservationThresholdIncrementMultiple() {
+    return getFloat(
+      RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
+      DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE);
+  }
+
   public float getLocalityThresholdNode() {
     return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94dec5a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 1c9801d..dd7ed41 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -64,6 +64,7 @@ public class FairSchedulerTestBase {
   protected Configuration conf;
   protected FairScheduler scheduler;
   protected ResourceManager resourceManager;
+  public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
 
   // Helper methods
   public Configuration createConfiguration() {
@@ -76,6 +77,11 @@ public class FairSchedulerTestBase {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
     conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
     conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+
+    conf.setFloat(
+        FairSchedulerConfiguration
+           .RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
+        TEST_RESERVATION_THRESHOLD);
     return conf;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94dec5a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index a02cf18..ad54616 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -710,9 +710,10 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     scheduler.handle(updateEvent);
 
     // Asked for less than increment allocation.
-    
assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+    assertEquals(
+        
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
         scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemory());
+            getResourceUsage().getMemory());
 
     NodeUpdateSchedulerEvent updateEvent2 = new 
NodeUpdateSchedulerEvent(node2);
     scheduler.handle(updateEvent2);
@@ -764,7 +765,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
 
     // Make sure queue 2 is waiting with a reservation
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
-      getResourceUsage().getMemory());
+        getResourceUsage().getMemory());
     assertEquals(1024, 
scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory());
 
     // Now another node checks in with capacity
@@ -939,8 +940,88 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
         getResourceUsage().getMemory());
   }
 
-  
+  @Test
+  public void testReservationThresholdGatesReservations() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<defaultQueueSchedulingPolicy>drf" +
+        "</defaultQueueSchedulingPolicy>");
+    out.println("</allocations>");
+    out.close();
 
+    // Set threshold to 2 * 1024 ==> 2048 MB & 2 * 1 ==> 2 vcores (test will
+    // use vcores)
+    conf.setFloat(FairSchedulerConfiguration.
+            RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
+        2f);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(4096, 4), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Queue 1 requests full capacity of node
+    createSchedulingRequest(4096, 4, "queue1", "user1", 1, 1);
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+
+    scheduler.handle(updateEvent);
+
+    // Make sure queue 1 is allocated app capacity
+    assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+        getResourceUsage().getMemory());
+
+    // Now queue 2 requests below threshold
+    ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", 
"user1", 1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // Make sure queue 2 has no reservation
+    assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+        getResourceUsage().getMemory());
+    assertEquals(0,
+        scheduler.getSchedulerApp(attId).getReservedContainers().size());
+
+    // Now queue requests CPU above threshold
+    createSchedulingRequestExistingApplication(1024, 3, 1, attId);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // Make sure queue 2 is waiting with a reservation
+    assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+        getResourceUsage().getMemory());
+    assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
+        .getVirtualCores());
+
+    // Now another node checks in with capacity
+    RMNode node2 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(1024, 4), 2, "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    NodeUpdateSchedulerEvent updateEvent2 = new 
NodeUpdateSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+    scheduler.handle(updateEvent2);
+
+    // Make sure this goes to queue 2
+    assertEquals(3, scheduler.getQueueManager().getQueue("queue2").
+        getResourceUsage().getVirtualCores());
+
+    // The old reservation should still be there...
+    assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
+        .getVirtualCores());
+    // ... but it should disappear when we update the first node.
+    scheduler.handle(updateEvent);
+    assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation()
+        .getVirtualCores());
+  }
 
   @Test
   public void testEmptyQueueName() throws Exception {

Reply via email to