Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 bade7f06e -> 63bb91274


YARN-4610. Reservations continue looking for one app causes other apps to 
starve. Contributed by Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63bb9127
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63bb9127
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63bb9127

Branch: refs/heads/branch-2.7
Commit: 63bb9127496c12c91ad04c508507a1c167daba44
Parents: bade7f0
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 21 18:35:24 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 21 18:35:24 2016 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/LeafQueue.java           |   6 +
 .../scheduler/capacity/TestReservations.java    | 142 ++++++++++++++++++-
 3 files changed, 149 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/63bb9127/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e18b8e7..0f869e2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -69,6 +69,9 @@ Release 2.7.3 - UNRELEASED
     YARN-4581. AHS writer thread leak makes RM crash while RM is recovering.
     (sandflee via junping_du)
 
+    YARN-4610. Reservations continue looking for one app causes other apps to
+    starve (jlowe)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63bb9127/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index e3b9808..e8daa88 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -769,6 +769,9 @@ public class LeafQueue extends AbstractCSQueue {
       }
     }
     
+    Resource initAmountNeededUnreserve =
+        currentResourceLimits.getAmountNeededUnreserve();
+
     // Try to assign containers to applications in order
     for (FiCaSchedulerApp application : activeApplications) {
       
@@ -821,6 +824,9 @@ public class LeafQueue extends AbstractCSQueue {
               computeUserLimitAndSetHeadroom(application, clusterResource, 
                   required, requestedNodeLabels);          
           
+          currentResourceLimits.setAmountNeededUnreserve(
+              initAmountNeededUnreserve);
+
           // Check queue max-capacity limit
           if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
               currentResourceLimits, required, 
application.getCurrentReservation())) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63bb9127/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index e215e0c..29e2a88 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -102,12 +102,17 @@ public class TestReservations {
   }
 
   private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
+    setup(csConf, false);
+  }
+
+  private void setup(CapacitySchedulerConfiguration csConf,
+      boolean addUserLimits) throws Exception {
 
     csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
     final String newRoot = "root" + System.currentTimeMillis();
     // final String newRoot = "root";
 
-    setupQueueConfiguration(csConf, newRoot);
+    setupQueueConfiguration(csConf, newRoot, addUserLimits);
     YarnConfiguration conf = new YarnConfiguration();
     cs.setConf(conf);
 
@@ -148,7 +153,7 @@ public class TestReservations {
   private static final String A = "a";
 
   private void setupQueueConfiguration(CapacitySchedulerConfiguration conf,
-      final String newRoot) {
+      final String newRoot, boolean addUserLimits) {
 
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT,
@@ -169,6 +174,10 @@ public class TestReservations {
     conf.setMaximumCapacity(Q_A, 100);
     conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
 
+    if (addUserLimits) {
+      conf.setUserLimit(Q_A, 25);
+      conf.setUserLimitFactor(Q_A, 0.25f);
+    }
   }
 
   static LeafQueue stubLeafQueue(LeafQueue queue) {
@@ -354,6 +363,135 @@ public class TestReservations {
     assertEquals(0, app_0.getTotalRequiredResources(priorityReduce));
   }
 
+  // Test that hitting a reservation limit and needing to unreserve
+  // does not affect assigning containers for other users
+  @Test
+  public void testReservationLimitOtherUsers() throws Exception {
+    CapacitySchedulerConfiguration csConf = new 
CapacitySchedulerConfiguration();
+    setup(csConf, true);
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
+
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = TestUtils
+        .getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+        mock(ActiveUsersManager.class), spyRMContext);
+    app_0 = spy(app_0);
+    rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
+
+    a.submitApplicationAttempt(app_0, user_0);
+
+    final ApplicationAttemptId appAttemptId_1 = TestUtils
+        .getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a,
+        mock(ActiveUsersManager.class), spyRMContext);
+    app_1 = spy(app_1);
+    rmContext.getRMApps().put(app_1.getApplicationId(), mock(RMApp.class));
+
+    a.submitApplicationAttempt(app_1, user_1);
+
+    // Setup some nodes
+    String host_0 = "host_0";
+    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
+        8 * GB);
+    String host_1 = "host_1";
+    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
+        8 * GB);
+    String host_2 = "host_2";
+    FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
+        8 * GB);
+
+    when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
+    when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
+    when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
+
+    cs.getAllNodes().put(node_0.getNodeID(), node_0);
+    cs.getAllNodes().put(node_1.getNodeID(), node_1);
+    cs.getAllNodes().put(node_2.getNodeID(), node_2);
+
+    final int numNodes = 3;
+    Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Setup resource-requests
+    Priority priorityAM = TestUtils.createMockPriority(1);
+    Priority priorityMap = TestUtils.createMockPriority(5);
+
+    app_0.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
+            priorityAM, recordFactory)));
+    app_1.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
+            priorityAM, recordFactory)));
+
+    // Start testing...
+    // Only AM
+    a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource));
+    assertEquals(2 * GB, a.getUsedResources().getMemory());
+    assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0 * GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0 * GB, a.getMetrics().getReservedMB());
+    assertEquals(2 * GB, a.getMetrics().getAllocatedMB());
+    assertEquals(22 * GB, a.getMetrics().getAvailableMB());
+    assertEquals(2 * GB, node_0.getUsedResource().getMemory());
+    assertEquals(0 * GB, node_1.getUsedResource().getMemory());
+    assertEquals(0 * GB, node_2.getUsedResource().getMemory());
+
+    a.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource));
+    assertEquals(4 * GB, a.getUsedResources().getMemory());
+    assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0 * GB, a.getMetrics().getReservedMB());
+    assertEquals(4 * GB, a.getMetrics().getAllocatedMB());
+    assertEquals(20 * GB, a.getMetrics().getAvailableMB());
+    assertEquals(2 * GB, node_0.getUsedResource().getMemory());
+    assertEquals(2 * GB, node_1.getUsedResource().getMemory());
+    assertEquals(0 * GB, node_2.getUsedResource().getMemory());
+
+    // Add a few requests to each app
+    app_0.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
+            priorityMap, recordFactory)));
+    app_1.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 2, true,
+            priorityMap, recordFactory)));
+
+    // add a reservation for app_0
+    a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource));
+    assertEquals(12 * GB, a.getUsedResources().getMemory());
+    assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(8 * GB, a.getMetrics().getReservedMB());
+    assertEquals(4 * GB, a.getMetrics().getAllocatedMB());
+    assertEquals(12 * GB, a.getMetrics().getAvailableMB());
+    assertEquals(2 * GB, node_0.getUsedResource().getMemory());
+    assertEquals(2 * GB, node_1.getUsedResource().getMemory());
+    assertEquals(0 * GB, node_2.getUsedResource().getMemory());
+
+    // next assignment is beyond user limit for user_0 but it should assign to
+    // app_1 for user_1
+    a.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource));
+    assertEquals(14 * GB, a.getUsedResources().getMemory());
+    assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(4 * GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(8 * GB, a.getMetrics().getReservedMB());
+    assertEquals(6 * GB, a.getMetrics().getAllocatedMB());
+    assertEquals(10 * GB, a.getMetrics().getAvailableMB());
+    assertEquals(2 * GB, node_0.getUsedResource().getMemory());
+    assertEquals(4 * GB, node_1.getUsedResource().getMemory());
+    assertEquals(0 * GB, node_2.getUsedResource().getMemory());
+  }
+
   @Test
   public void testReservationNoContinueLook() throws Exception {
     // Test that with reservations-continue-look-all-nodes feature off

Reply via email to