Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 9cc7ab4e9 -> 93165c1ad


YARN-4963. capacity scheduler: Make number of OFF_SWITCH assignments per 
heartbeat configurable. Contributed by Nathan Roberts


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/93165c1a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/93165c1a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/93165c1a

Branch: refs/heads/branch-2.8
Commit: 93165c1aded6046ee7e6920133348444dcce6899
Parents: 9cc7ab4
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Oct 28 17:44:56 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Oct 28 17:44:56 2016 +0000

----------------------------------------------------------------------
 .../conf/capacity-scheduler.xml                 | 12 +++++++
 .../CapacitySchedulerConfiguration.java         | 21 +++++++++++++
 .../scheduler/capacity/ParentQueue.java         | 33 +++++++++++++++-----
 .../scheduler/capacity/TestParentQueue.java     | 32 ++++++++++++-------
 4 files changed, 79 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/93165c1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
index 30f4eb9..6ac726e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
@@ -131,4 +131,16 @@
     </description>
   </property>
 
+  <property>
+    
<name>yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments</name>
+    <value>1</value>
+    <description>
+      Controls the number of OFF_SWITCH assignments allowed
+      during a node's heartbeat. Increasing this value can improve
+      scheduling rate for OFF_SWITCH containers. Lower values reduce
+      "clumping" of applications on particular nodes. The default is 1.
+      Legal values are 1-MAX_INT. This config is refreshable.
+    </description>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93165c1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index c7fe939..0ab908a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -191,6 +191,13 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
       PREFIX + "rack-locality-full-reset";
 
   @Private
+  public static final int DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT = 1;
+
+  @Private
+  public static final String OFFSWITCH_PER_HEARTBEAT_LIMIT =
+      PREFIX + "per-node-heartbeat.maximum-offswitch-assignments";
+
+  @Private
   public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true;
 
   @Private
@@ -707,6 +714,20 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
   }
 
+  public int getOffSwitchPerHeartbeatLimit() {
+    int limit = getInt(OFFSWITCH_PER_HEARTBEAT_LIMIT,
+        DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT);
+    if (limit < 1) {
+      LOG.warn(OFFSWITCH_PER_HEARTBEAT_LIMIT + "(" + limit + ") < 1. Using 
1.");
+      limit = 1;
+    }
+    return limit;
+  }
+
+  public void setOffSwitchPerHeartbeatLimit(int limit) {
+    setInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, limit);
+  }
+
   public int getNodeLocalityDelay() {
     return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93165c1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index c5f1e11..29c032b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -76,6 +76,7 @@ public class ParentQueue extends AbstractCSQueue {
   volatile int numApplications;
   private final CapacitySchedulerContext scheduler;
   private boolean needToResortQueuesAtNextAllocation = false;
+  private int offswitchPerHeartbeatLimit;
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
@@ -123,6 +124,9 @@ public class ParentQueue extends AbstractCSQueue {
       }
     }
 
+    offswitchPerHeartbeatLimit =
+        csContext.getConfiguration().getOffSwitchPerHeartbeatLimit();
+
     LOG.info(queueName +
         ", capacity=" + this.queueCapacities.getCapacity() +
         ", asboluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
@@ -130,7 +134,8 @@ public class ParentQueue extends AbstractCSQueue {
         ", asboluteMaxCapacity=" + 
this.queueCapacities.getAbsoluteMaximumCapacity() + 
         ", state=" + state +
         ", acls=" + aclsString + 
-        ", labels=" + labelStrBuilder.toString() + "\n" +
+        ", labels=" + labelStrBuilder.toString() +
+        ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit() +
         ", reservationsContinueLooking=" + reservationsContinueLooking);
   }
 
@@ -196,6 +201,11 @@ public class ParentQueue extends AbstractCSQueue {
     return queueInfo;
   }
 
+  @Private
+  public int getOffSwitchPerHeartbeatLimit() {
+    return offswitchPerHeartbeatLimit;
+  }
+
   private synchronized QueueUserACLInfo getUserAclInfo(
       UserGroupInformation user) {
     QueueUserACLInfo userAclInfo = 
@@ -383,6 +393,8 @@ public class ParentQueue extends AbstractCSQueue {
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits resourceLimits,
       SchedulingMode schedulingMode) {
+    int offswitchCount = 0;
+
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
         && !accessibleToPartition(node.getPartition())) {
@@ -478,13 +490,18 @@ public class ParentQueue extends AbstractCSQueue {
           + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity());
       }
 
-      // Do not assign more than one container if this isn't the root queue
-      // or if we've already assigned an off-switch container
-      if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
+      if (assignment.getType() == NodeType.OFF_SWITCH) {
+        offswitchCount++;
+      }
+
+      // Do not assign more containers if this isn't the root queue
+      // or if we've already assigned enough OFF_SWITCH containers in
+      // this pass
+      if (!rootQueue || offswitchCount >= getOffSwitchPerHeartbeatLimit()) {
         if (LOG.isDebugEnabled()) {
-          if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
-            LOG.debug("Not assigning more than one off-switch container," +
-                " assignments so far: " + assignment);
+          if (rootQueue && offswitchCount >= getOffSwitchPerHeartbeatLimit()) {
+            LOG.debug("Assigned maximum number of off-switch containers: " +
+                offswitchCount + ", assignments so far: " + assignment);
           }
         }
         break;
@@ -821,4 +838,4 @@ public class ParentQueue extends AbstractCSQueue {
   public synchronized int getNumApplications() {
     return numApplications;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93165c1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index e3c04f8..1dc8d13 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
@@ -611,6 +612,8 @@ public class TestParentQueue {
   public void testOffSwitchScheduling() throws Exception {
     // Setup queue configs
     setupSingleLevelQueues(csConf);
+    csConf.setOffSwitchPerHeartbeatLimit(2);
+
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
@@ -641,12 +644,18 @@ public class TestParentQueue {
     queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
     .incPending(Resources.createResource(1 * GB));
     
-    // Simulate B returning a container on node_0
-    stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
+    // Simulate returning 2 containers on node_0 before offswitch limit
+    stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
+
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verifyQueueMetrics(a, 0*GB, clusterResource);
+    InOrder allocationOrder = inOrder(a, b);
+    allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
+        any(FiCaSchedulerNode.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
+        any(FiCaSchedulerNode.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
     // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
@@ -655,27 +664,28 @@ public class TestParentQueue {
     stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_1, 
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    InOrder allocationOrder = inOrder(a, b);
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
+    allocationOrder = inOrder(a, b);
+    allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
         any(FiCaSchedulerNode.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
+    allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
         any(FiCaSchedulerNode.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    verifyQueueMetrics(a, 2*GB, clusterResource);
+    verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     
     // Now, B should get the scheduling opportunity 
     // since A has 2/6G while B has 2/14G, 
-    // However, since B returns off-switch, A won't get an opportunity
+    // A also gets an opportunity because offswitchlimit not reached
     stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
+
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
+    allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
         any(FiCaSchedulerNode.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
+    allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
         any(FiCaSchedulerNode.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    verifyQueueMetrics(a, 2*GB, clusterResource);
+    verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to