Author: sandy
Date: Tue Oct  1 19:57:40 2013
New Revision: 1528195

URL: http://svn.apache.org/r1528195
Log:
YARN-1010. FairScheduler: decouple container scheduling from nodemanager 
heartbeats. (Wei Yan via Sandy Ryza)

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1528195&r1=1528194&r2=1528195&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Oct  1 
19:57:40 2013
@@ -11,6 +11,9 @@ Release 2.3.0 - UNRELEASED
 
     YARN-1021. Yarn Scheduler Load Simulator. (ywskycn via tucu)
 
+    YARN-1010. FairScheduler: decouple container scheduling from nodemanager
+    heartbeats. (Wei Yan via Sandy Ryza)
+
   IMPROVEMENTS
 
     YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1528195&r1=1528194&r2=1528195&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
 Tue Oct  1 19:57:40 2013
@@ -310,10 +310,19 @@ public class AppSchedulable extends Sche
               + localRequest);
         }
         
-        NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
-            scheduler.getNumClusterNodes(), 
scheduler.getNodeLocalityThreshold(),
-            scheduler.getRackLocalityThreshold());
-        
+        NodeType allowedLocality;
+        if (scheduler.isContinuousSchedulingEnabled()) {
+          allowedLocality = app.getAllowedLocalityLevelByTime(priority,
+                  scheduler.getNodeLocalityDelayMs(),
+                  scheduler.getRackLocalityDelayMs(),
+                  scheduler.getClock().getTime());
+        } else {
+          allowedLocality = app.getAllowedLocalityLevel(priority,
+                  scheduler.getNumClusterNodes(),
+                  scheduler.getNodeLocalityThreshold(),
+                  scheduler.getRackLocalityThreshold());
+        }
+
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && localRequest != null && localRequest.getNumContainers() != 0) {
           return assignContainer(node, priority,

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1528195&r1=1528194&r2=1528195&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
 Tue Oct  1 19:57:40 2013
@@ -464,7 +464,12 @@ public class FSSchedulerApp extends Sche
    * @param priority The priority of the container scheduled.
    */
   synchronized public void resetSchedulingOpportunities(Priority priority) {
-    lastScheduledContainer.put(priority, System.currentTimeMillis());
+    resetSchedulingOpportunities(priority, System.currentTimeMillis());
+  }
+  // used for continuous scheduling
+  synchronized public void resetSchedulingOpportunities(Priority priority,
+                                                        long currentTimeMs) {
+    lastScheduledContainer.put(priority, currentTimeMs);
     schedulingOpportunities.setCount(priority, 0);
   }
 
@@ -513,6 +518,55 @@ public class FSSchedulerApp extends Sche
     return allowedLocalityLevel.get(priority);
   }
 
+  /**
+   * Return the level at which we are allowed to schedule containers.
+   * Given the thresholds indicating how much time passed before relaxing
+   * scheduling constraints.
+   */
+  public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
+          long nodeLocalityDelayMs, long rackLocalityDelayMs,
+          long currentTimeMs) {
+
+    // if not being used, can schedule anywhere
+    if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
+      return NodeType.OFF_SWITCH;
+    }
+
+    // default level is NODE_LOCAL
+    if (! allowedLocalityLevel.containsKey(priority)) {
+      allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
+      return NodeType.NODE_LOCAL;
+    }
+
+    NodeType allowed = allowedLocalityLevel.get(priority);
+
+    // if level is already most liberal, we're done
+    if (allowed.equals(NodeType.OFF_SWITCH)) {
+      return NodeType.OFF_SWITCH;
+    }
+
+    // check waiting time
+    long waitTime = currentTimeMs;
+    if (lastScheduledContainer.containsKey(priority)) {
+      waitTime -= lastScheduledContainer.get(priority);
+    } else {
+      waitTime -= appSchedulable.getStartTime();
+    }
+
+    long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
+            nodeLocalityDelayMs : rackLocalityDelayMs;
+
+    if (waitTime > thresholdTime) {
+      if (allowed.equals(NodeType.NODE_LOCAL)) {
+        allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
+        resetSchedulingOpportunities(priority, currentTimeMs);
+      } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+        allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
+        resetSchedulingOpportunities(priority, currentTimeMs);
+      }
+    }
+    return allowedLocalityLevel.get(priority);
+  }
 
   synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
       Priority priority, ResourceRequest request,

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1528195&r1=1528194&r2=1528195&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 Tue Oct  1 19:57:40 2013
@@ -179,8 +179,12 @@ public class FairScheduler implements Re
   protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight 
adjuster
+  protected boolean continuousSchedulingEnabled; // Continuous Scheduling 
enabled or not
+  protected int continuousSchedulingSleepMs; // Sleep time for each pass in 
continuous scheduling
   protected double nodeLocalityThreshold; // Cluster threshold for node 
locality
   protected double rackLocalityThreshold; // Cluster threshold for rack 
locality
+  protected long nodeLocalityDelayMs; // Delay for node locality
+  protected long rackLocalityDelayMs; // Delay for rack locality
   private FairSchedulerEventLog eventLog; // Machine-readable event log
   protected boolean assignMultiple; // Allocate multiple containers per
                                     // heartbeat
@@ -582,6 +586,22 @@ public class FairScheduler implements Re
     return rackLocalityThreshold;
   }
 
+  public long getNodeLocalityDelayMs() {
+    return nodeLocalityDelayMs;
+  }
+
+  public long getRackLocalityDelayMs() {
+    return rackLocalityDelayMs;
+  }
+
+  public boolean isContinuousSchedulingEnabled() {
+    return continuousSchedulingEnabled;
+  }
+
+  public synchronized int getContinuousSchedulingSleepMs() {
+    return continuousSchedulingSleepMs;
+  }
+
   public Resource getClusterCapacity() {
     return clusterCapacity;
   }
@@ -907,6 +927,37 @@ public class FairScheduler implements Re
           completedContainer, RMContainerEventType.FINISHED);
     }
 
+    if (continuousSchedulingEnabled) {
+      if (!completedContainers.isEmpty()) {
+        attemptScheduling(node);
+      }
+    } else {
+      attemptScheduling(node);
+    }
+  }
+
+  private void continuousScheduling() {
+    while (true) {
+      for (FSSchedulerNode node : nodes.values()) {
+        try {
+          if (Resources.fitsIn(minimumAllocation, 
node.getAvailableResource())) {
+            attemptScheduling(node);
+          }
+        } catch (Throwable ex) {
+          LOG.warn("Error while attempting scheduling for node " + node + ": " 
+
+                  ex.toString(), ex);
+        }
+      }
+      try {
+        Thread.sleep(getContinuousSchedulingSleepMs());
+      } catch (InterruptedException e) {
+        LOG.warn("Error while doing sleep in continuous scheduling: " +
+                e.toString(), e);
+      }
+    }
+  }
+  
+  private synchronized void attemptScheduling(FSSchedulerNode node) {
     // Assign new containers...
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
@@ -914,19 +965,18 @@ public class FairScheduler implements Re
     AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
     if (reservedAppSchedulable != null) {
       Priority reservedPriority = 
node.getReservedContainer().getReservedPriority();
-      if (reservedAppSchedulable != null &&
-          !reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) 
{
+      if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) 
{
         // Don't hold the reservation if app can no longer use it
         LOG.info("Releasing reservation that cannot be satisfied for 
application "
             + reservedAppSchedulable.getApp().getApplicationAttemptId()
-            + " on node " + nm);
+            + " on node " + node);
         reservedAppSchedulable.unreserve(reservedPriority, node);
         reservedAppSchedulable = null;
       } else {
         // Reservation exists; try to fulfill the reservation
         LOG.info("Trying to fulfill reservation for application "
             + reservedAppSchedulable.getApp().getApplicationAttemptId()
-            + " on node: " + nm);
+            + " on node: " + node);
 
         node.getReservedAppSchedulable().assignReservedContainer(node);
       }
@@ -1060,8 +1110,13 @@ public class FairScheduler implements Re
     maximumAllocation = this.conf.getMaximumAllocation();
     incrAllocation = this.conf.getIncrementAllocation();
     userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
+    continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+    continuousSchedulingSleepMs =
+            this.conf.getContinuousSchedulingSleepMs();
     nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
     rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+    nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+    rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
     preemptionEnabled = this.conf.getPreemptionEnabled();
     assignMultiple = this.conf.getAssignMultiple();
     maxAssign = this.conf.getMaxAssign();
@@ -1088,6 +1143,21 @@ public class FairScheduler implements Re
       updateThread.setName("FairSchedulerUpdateThread");
       updateThread.setDaemon(true);
       updateThread.start();
+
+      if (continuousSchedulingEnabled) {
+        // start continuous scheduling thread
+        Thread schedulingThread = new Thread(
+          new Runnable() {
+            @Override
+            public void run() {
+              continuousScheduling();
+            }
+          }
+        );
+        schedulingThread.setName("ContinuousScheduling");
+        schedulingThread.setDaemon(true);
+        schedulingThread.start();
+      }
     } else {
       try {
         queueMgr.reloadAllocs();

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1528195&r1=1528194&r2=1528195&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
 Tue Oct  1 19:57:40 2013
@@ -66,6 +66,22 @@ public class FairSchedulerConfiguration 
   protected static final float  DEFAULT_LOCALITY_THRESHOLD_RACK =
                  DEFAULT_LOCALITY_THRESHOLD;
 
+  /** Delay for node locality. */
+  protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX + 
"locality-delay-node-ms";
+  protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L;
+
+  /** Delay for rack locality. */
+  protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX + 
"locality-delay-rack-ms";
+  protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L;
+
+  /** Enable continuous scheduling or not. */
+  protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + 
"continuous-scheduling-enabled";
+  protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
+
+  /** Sleep time of each pass in continuous scheduling (5ms in default) */
+  protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + 
"continuous-scheduling-sleep-ms";
+  protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
+
   /** Whether preemption is enabled. */
   protected static final String  PREEMPTION = CONF_PREFIX + "preemption";
   protected static final boolean DEFAULT_PREEMPTION = false;
@@ -134,6 +150,22 @@ public class FairSchedulerConfiguration 
     return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
   }
 
+  public boolean isContinuousSchedulingEnabled() {
+    return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, 
DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
+  }
+
+  public int getContinuousSchedulingSleepMs() {
+    return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, 
DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
+  }
+
+  public long getLocalityDelayNodeMs() {
+    return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
+  }
+
+  public long getLocalityDelayRackMs() {
+    return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS);
+  }
+
   public boolean getPreemptionEnabled() {
     return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
   }

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java?rev=1528195&r1=1528194&r2=1528195&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
 Tue Oct  1 19:57:40 2013
@@ -25,11 +25,25 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.util.Clock;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestFSSchedulerApp {
 
+  private class MockClock implements Clock {
+    private long time = 0;
+    @Override
+    public long getTime() {
+      return time;
+    }
+
+    public void tick(int seconds) {
+      time = time + seconds * 1000;
+    }
+
+  }
+
   private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
     ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
     ApplicationAttemptId attId =
@@ -94,6 +108,63 @@ public class TestFSSchedulerApp {
   }
 
   @Test
+  public void testDelaySchedulingForContinuousScheduling()
+          throws InterruptedException {
+    Queue queue = Mockito.mock(Queue.class);
+    Priority prio = Mockito.mock(Priority.class);
+    Mockito.when(prio.getPriority()).thenReturn(1);
+
+    MockClock clock = new MockClock();
+
+    long nodeLocalityDelayMs = 5 * 1000L;    // 5 seconds
+    long rackLocalityDelayMs = 6 * 1000L;    // 6 seconds
+
+    ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+    FSSchedulerApp schedulerApp =
+            new FSSchedulerApp(applicationAttemptId, "user1", queue,
+                    null, null);
+    AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class);
+    long startTime = clock.getTime();
+    Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime);
+    schedulerApp.setAppSchedulable(appSchedulable);
+
+    // Default level should be node-local
+    assertEquals(NodeType.NODE_LOCAL,
+            schedulerApp.getAllowedLocalityLevelByTime(prio,
+                    nodeLocalityDelayMs, rackLocalityDelayMs, 
clock.getTime()));
+
+    // after 4 seconds should remain node local
+    clock.tick(4);
+    assertEquals(NodeType.NODE_LOCAL,
+            schedulerApp.getAllowedLocalityLevelByTime(prio,
+                    nodeLocalityDelayMs, rackLocalityDelayMs, 
clock.getTime()));
+
+    // after 6 seconds should switch to rack local
+    clock.tick(2);
+    assertEquals(NodeType.RACK_LOCAL,
+            schedulerApp.getAllowedLocalityLevelByTime(prio,
+                    nodeLocalityDelayMs, rackLocalityDelayMs, 
clock.getTime()));
+
+    // manually set back to node local
+    schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL);
+    schedulerApp.resetSchedulingOpportunities(prio, clock.getTime());
+    assertEquals(NodeType.NODE_LOCAL,
+            schedulerApp.getAllowedLocalityLevelByTime(prio,
+                    nodeLocalityDelayMs, rackLocalityDelayMs, 
clock.getTime()));
+
+    // Now escalate again to rack-local, then to off-switch
+    clock.tick(6);
+    assertEquals(NodeType.RACK_LOCAL,
+            schedulerApp.getAllowedLocalityLevelByTime(prio,
+                    nodeLocalityDelayMs, rackLocalityDelayMs, 
clock.getTime()));
+
+    clock.tick(7);
+    assertEquals(NodeType.OFF_SWITCH,
+            schedulerApp.getAllowedLocalityLevelByTime(prio,
+                    nodeLocalityDelayMs, rackLocalityDelayMs, 
clock.getTime()));
+  }
+
+  @Test
   /**
    * Ensure that when negative paramaters are given (signaling delay scheduling
    * no tin use), the least restrictive locality level is returned.

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1528195&r1=1528194&r2=1528195&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 Tue Oct  1 19:57:40 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -298,6 +299,14 @@ public class TestFairScheduler {
     conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true);
     conf.setFloat(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5f);
     conf.setFloat(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7f);
+    conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
+            true);
+    conf.setInt(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS,
+            10);
+    conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
+            5000);
+    conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
+            5000);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 
@@ -308,6 +317,11 @@ public class TestFairScheduler {
     Assert.assertEquals(true, scheduler.sizeBasedWeight);
     Assert.assertEquals(.5, scheduler.nodeLocalityThreshold, .01);
     Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01);
+    Assert.assertTrue("The continuous scheduling should be enabled",
+            scheduler.continuousSchedulingEnabled);
+    Assert.assertEquals(10, scheduler.continuousSchedulingSleepMs);
+    Assert.assertEquals(5000, scheduler.nodeLocalityDelayMs);
+    Assert.assertEquals(5000, scheduler.rackLocalityDelayMs);
     Assert.assertEquals(1024, 
scheduler.getMaximumResourceCapability().getMemory());
     Assert.assertEquals(512, 
scheduler.getMinimumResourceCapability().getMemory());
     Assert.assertEquals(128, 
@@ -2255,4 +2269,47 @@ public class TestFairScheduler {
         fs.applications, FSSchedulerApp.class);
   }
 
+  @Test
+  public void testContinuousScheduling() throws Exception {
+    // set continuous scheduling enabled
+    FairScheduler fs = new FairScheduler();
+    Configuration conf = createConfiguration();
+    conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
+            true);
+    fs.reinitialize(conf, resourceManager.getRMContext());
+    Assert.assertTrue("Continuous scheduling should be enabled.",
+            fs.isContinuousSchedulingEnabled());
+
+    // Add one node
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    fs.handle(nodeEvent1);
+
+    // available resource
+    Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024);
+    Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8);
+
+    // send application request
+    ApplicationAttemptId appAttemptId =
+            createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
+    fs.addApplication(appAttemptId, "queue11", "user11");
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ResourceRequest request =
+            createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
+    ask.add(request);
+    fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
+
+    // waiting for continuous_scheduler_sleep_time
+    // at least one pass
+    Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
+
+    // check consumption
+    Resource consumption =
+            fs.applications.get(appAttemptId).getCurrentConsumption();
+    Assert.assertEquals(1024, consumption.getMemory());
+    Assert.assertEquals(1, consumption.getVirtualCores());
+  }
+
 }


Reply via email to