Repository: hadoop
Updated Branches:
  refs/heads/trunk 0126cf16b -> 179cab81e


YARN-2712. TestWorkPreservingRMRestart: Augment FS tests with queue and 
headroom checks. (Tsuyoshi Ozawa via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/179cab81
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/179cab81
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/179cab81

Branch: refs/heads/trunk
Commit: 179cab81e0bde1af0cba6131ccccf16ff127358a
Parents: 0126cf1
Author: Karthik Kambatla <ka...@apache.org>
Authored: Thu Oct 30 00:29:07 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Thu Oct 30 00:29:07 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/fair/FairScheduler.java           |   1 +
 .../TestWorkPreservingRMRestart.java            | 144 ++++++++++++++-----
 3 files changed, 114 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/179cab81/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2369d50..f4e4afa 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -56,6 +56,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2742. FairSchedulerConfiguration should allow extra spaces
     between value and unit. (Wei Yan via kasha)
 
+    YARN-2712. TestWorkPreservingRMRestart: Augment FS tests with
+    queue and headroom checks. (Tsuyoshi Ozawa via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/179cab81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index d633981..3fc3019 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -305,6 +305,7 @@ public class FairScheduler extends
     // Recursively compute fair shares for all queues
     // and update metrics
     rootQueue.recomputeShares();
+    updateRootQueueMetrics();
 
     if (LOG.isDebugEnabled()) {
       if (--updatesToSkipForDebug < 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/179cab81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 80be22b..85d3895 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -47,6 +49,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -65,6 +70,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -148,6 +154,9 @@ public class TestWorkPreservingRMRestart {
     MemoryRMStateStore memStore = new MemoryRMStateStore();
     memStore.init(conf);
     rm1 = new MockRM(conf, memStore);
+    if (schedulerClass.equals(FairScheduler.class)) {
+      initFairScheduler(rm1);
+    }
     rm1.start();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
@@ -160,6 +169,9 @@ public class TestWorkPreservingRMRestart {
 
     // Re-start RM
     rm2 = new MockRM(conf, memStore);
+    if (schedulerClass.equals(FairScheduler.class)) {
+      initFairScheduler(rm2);
+    }
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     // recover app
@@ -227,7 +239,9 @@ public class TestWorkPreservingRMRestart {
     if (schedulerClass.equals(CapacityScheduler.class)) {
       checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 
2);
     } else if (schedulerClass.equals(FifoScheduler.class)) {
-      checkFifoQueue(schedulerApp, usedResources, availableResources);
+      checkFifoQueue(rm2, schedulerApp, usedResources, availableResources);
+    } else if (schedulerClass.equals(FairScheduler.class)) {
+      checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
     }
 
     // *********** check scheduler attempt state.********
@@ -239,11 +253,6 @@ public class TestWorkPreservingRMRestart {
       scheduler.getRMContainer(runningContainer.getContainerId())));
     assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
 
-    // Until YARN-1959 is resolved
-    if (scheduler.getClass() != FairScheduler.class) {
-      assertEquals(availableResources, schedulerAttempt.getHeadroom());
-    }
-
     // *********** check appSchedulingInfo state ***********
     assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
   }
@@ -253,23 +262,28 @@ public class TestWorkPreservingRMRestart {
       Resource clusterResource, Resource queueResource, Resource usedResource,
       int numContainers)
       throws Exception {
-    checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource,
-      numContainers);
+    checkCSLeafQueue(rm, app, clusterResource, queueResource, usedResource,
+        numContainers);
 
     LeafQueue queue = (LeafQueue) app.getQueue();
-    Resource availableResources = Resources.subtract(queueResource, 
usedResource);
+    Resource availableResources =
+        Resources.subtract(queueResource, usedResource);
+    // ************ check app headroom ****************
+    SchedulerApplicationAttempt schedulerAttempt = app.getCurrentAppAttempt();
+    assertEquals(availableResources, schedulerAttempt.getHeadroom());
+
     // ************* check Queue metrics ************
     QueueMetrics queueMetrics = queue.getMetrics();
-    asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
-      availableResources.getVirtualCores(), usedResource.getMemory(),
-      usedResource.getVirtualCores());
+    assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+        availableResources.getVirtualCores(), usedResource.getMemory(),
+        usedResource.getVirtualCores());
 
     // ************ check user metrics ***********
     QueueMetrics userMetrics =
         queueMetrics.getUserMetrics(app.getUser());
-    asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
-      availableResources.getVirtualCores(), usedResource.getMemory(),
-      usedResource.getVirtualCores());
+    assertMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+        availableResources.getVirtualCores(), usedResource.getMemory(),
+        usedResource.getVirtualCores());
   }
 
   private void checkCSLeafQueue(MockRM rm,
@@ -297,9 +311,10 @@ public class TestWorkPreservingRMRestart {
       .getTotalConsumedResources());
   }
 
-  private void checkFifoQueue(SchedulerApplication schedulerApp,
-      Resource usedResources, Resource availableResources) throws Exception {
-    FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler();
+  private void checkFifoQueue(ResourceManager rm,
+      SchedulerApplication  schedulerApp, Resource usedResources,
+      Resource availableResources) throws Exception {
+    FifoScheduler scheduler = (FifoScheduler) rm.getResourceScheduler();
     // ************ check cluster used Resources ********
     assertEquals(usedResources, scheduler.getUsedResource());
 
@@ -310,9 +325,68 @@ public class TestWorkPreservingRMRestart {
 
     // ************ check queue metrics ****************
     QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
-    asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
-      availableResources.getVirtualCores(), usedResources.getMemory(),
-      usedResources.getVirtualCores());
+    assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+        availableResources.getVirtualCores(), usedResources.getMemory(),
+        usedResources.getVirtualCores());
+  }
+
+  private void checkFSQueue(ResourceManager rm,
+      SchedulerApplication  schedulerApp, Resource usedResources,
+      Resource availableResources) throws Exception {
+    // waiting for RM's scheduling apps
+    int retry = 0;
+    Resource assumedFairShare = Resource.newInstance(8192, 8);
+    while (true) {
+      Thread.sleep(100);
+      if (assumedFairShare.equals(((FairScheduler)rm.getResourceScheduler())
+          .getQueueManager().getRootQueue().getFairShare())) {
+        break;
+      }
+      retry++;
+      if (retry > 30) {
+        Assert.fail("Apps are not scheduled within assumed timeout");
+      }
+    }
+
+    FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
+    FSParentQueue root = scheduler.getQueueManager().getRootQueue();
+    // ************ check cluster used Resources ********
+    assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
+    assertEquals(usedResources,root.getResourceUsage());
+
+    // ************ check app headroom ****************
+    FSAppAttempt schedulerAttempt =
+        (FSAppAttempt) schedulerApp.getCurrentAppAttempt();
+    assertEquals(availableResources, schedulerAttempt.getHeadroom());
+
+    // ************ check queue metrics ****************
+    QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
+    assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+        availableResources.getVirtualCores(), usedResources.getMemory(),
+        usedResources.getVirtualCores());
+  }
+
+  private void initFairScheduler(ResourceManager rm) throws IOException {
+    FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
+    String testDir =
+        new File(
+            System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
+    String allocFile = new File(testDir, "test-queues").getAbsolutePath();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
+
+    PrintWriter out = new PrintWriter(new FileWriter(allocFile));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
+    out.println("<queue name=\"root\">");
+    out.println("  <schedulingPolicy>drf</schedulingPolicy>");
+    out.println("  <weight>1.0</weight>");
+    out.println("  
<fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
+    out.println("  
<minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
+    out.println("  
<fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
   }
 
   // create 3 container reports for AM
@@ -462,9 +536,10 @@ public class TestWorkPreservingRMRestart {
     checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource,
       q1UsedResource, 4);
     QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
-    asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4,
-      q1availableResources.getMemory(), q1availableResources.getVirtualCores(),
-      q1UsedResource.getMemory(), q1UsedResource.getVirtualCores());
+    assertMetrics(queue1Metrics, 2, 0, 2, 0, 4,
+        q1availableResources.getMemory(),
+        q1availableResources.getVirtualCores(), q1UsedResource.getMemory(),
+        q1UsedResource.getVirtualCores());
 
     // assert queue B state.
     SchedulerApplication schedulerApp2 =
@@ -472,19 +547,20 @@ public class TestWorkPreservingRMRestart {
     checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource,
       q2UsedResource, 2);
     QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
-    asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2,
-      q2availableResources.getMemory(), q2availableResources.getVirtualCores(),
-      q2UsedResource.getMemory(), q2UsedResource.getVirtualCores());
+    assertMetrics(queue2Metrics, 1, 0, 1, 0, 2,
+        q2availableResources.getMemory(),
+        q2availableResources.getVirtualCores(), q2UsedResource.getMemory(),
+        q2UsedResource.getVirtualCores());
 
     // assert parent queue state.
     LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue();
     ParentQueue parentQueue = (ParentQueue) leafQueue.getParent();
     checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
       (float) 6 / 16);
-    asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
-      totalAvailableResource.getMemory(),
-      totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
-      totalUsedResource.getVirtualCores());
+    assertMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
+        totalAvailableResource.getMemory(),
+        totalAvailableResource.getVirtualCores(), 
totalUsedResource.getMemory(),
+        totalUsedResource.getVirtualCores());
   }
   
   //Test that we receive a meaningful exit-causing exception if a queue
@@ -818,7 +894,7 @@ public class TestWorkPreservingRMRestart {
     }, 1000, 20000);
   }
 
-  private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
+  private void assertMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,
       int allocatedContainers, int availableMB, int availableVirtualCores,
       int allocatedMB, int allocatedVirtualCores) {

Reply via email to