Author: tucu
Date: Fri May 31 17:26:18 2013
New Revision: 1488326

URL: http://svn.apache.org/r1488326
Log:
YARN-392. Make it possible to specify hard locality constraints in resource 
requests. (sandyr via tucu)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1488326&r1=1488325&r2=1488326&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri May 31 17:26:18 2013
@@ -96,6 +96,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen
     via vinodkv)
 
+    YARN-392. Make it possible to specify hard locality constraints in resource
+    requests. (sandyr via tucu)
+
   IMPROVEMENTS
 
     YARN-365. Change NM heartbeat handling to not generate a scheduler event

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java?rev=1488326&r1=1488325&r2=1488326&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
 Fri May 31 17:26:18 2013
@@ -155,6 +155,44 @@ public abstract class ResourceRequest im
   @Stable
   public abstract void setNumContainers(int numContainers);
 
+  /**
+   * Get whether locality relaxation is enabled with this
+   * <code>ResourceRequest</code>. Defaults to true.
+   * 
+   * @return whether locality relaxation is enabled with this
+   * <code>ResourceRequest</code>.
+   */
+  @Public
+  @Stable
+  public abstract boolean getRelaxLocality();
+  
+  /**
+   * For a request at a network hierarchy level, set whether locality can be 
relaxed
+   * to that level and beyond.
+   * 
+   * If the flag is off on a rack-level <code>ResourceRequest</code>,
+   * containers at that request's priority will not be assigned to nodes on 
that
+   * request's rack unless requests specifically for those nodes have also been
+   * submitted.
+   * 
+   * If the flag is off on an {@link ResourceRequest#ANY}-level
+   * <code>ResourceRequest</code>, containers at that request's priority will
+   * only be assigned on racks for which specific requests have also been
+   * submitted.
+   * 
+   * For example, to request a container strictly on a specific node, the
+   * corresponding rack-level and any-level requests should have locality
+   * relaxation set to false.  Similarly, to request a container strictly on a
+   * specific rack, the corresponding any-level request should have locality
+   * relaxation set to false.
+   * 
+   * @param relaxLocality whether locality relaxation is enabled with this
+   * <code>ResourceRequest</code>.
+   */
+  @Public
+  @Stable
+  public abstract void setRelaxLocality(boolean relaxLocality);
+  
   @Override
   public int hashCode() {
     final int prime = 2153;

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java?rev=1488326&r1=1488325&r2=1488326&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
 Fri May 31 17:26:18 2013
@@ -146,6 +146,18 @@ public class ResourceRequestPBImpl exten
     maybeInitBuilder();
     builder.setNumContainers((numContainers));
   }
+  
+  @Override
+  public boolean getRelaxLocality() {
+    ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getRelaxLocality();
+  }
+
+  @Override
+  public void setRelaxLocality(boolean relaxLocality) {
+    maybeInitBuilder();
+    builder.setRelaxLocality(relaxLocality);
+  }
 
   private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
     return new PriorityPBImpl(p);
@@ -167,6 +179,7 @@ public class ResourceRequestPBImpl exten
   public String toString() {
     return "{Priority: " + getPriority() + ", Capability: " + getCapability()
         + ", # Containers: " + getNumContainers()
-        + ", Location: " + getHostName() + "}";
+        + ", Location: " + getHostName()
+        + ", Relax Locality: " + getRelaxLocality() + "}";
   }
 }
\ No newline at end of file

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1488326&r1=1488325&r2=1488326&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
 Fri May 31 17:26:18 2013
@@ -204,6 +204,7 @@ message ResourceRequestProto {
   optional string host_name = 2;
   optional ResourceProto capability = 3;
   optional int32 num_containers = 4;
+  optional bool relax_locality = 5 [default = true];
 }
 
 ////////////////////////////////////////////////////////////////////////

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1488326&r1=1488325&r2=1488326&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
 Fri May 31 17:26:18 2013
@@ -316,6 +316,11 @@ public class AppSchedulable extends Sche
         ResourceRequest localRequest = app.getResourceRequest(priority,
             node.getHostName());
         
+        if (localRequest != null && !localRequest.getRelaxLocality()) {
+          LOG.warn("Relax locality off is not supported on local request: "
+              + localRequest);
+        }
+        
         NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
             scheduler.getNumClusterNodes(), 
scheduler.getNodeLocalityThreshold(),
             scheduler.getRackLocalityThreshold());
@@ -325,6 +330,10 @@ public class AppSchedulable extends Sche
           return assignContainer(node, priority,
               localRequest, NodeType.NODE_LOCAL, reserved);
         }
+        
+        if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
+          continue;
+        }
 
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
@@ -335,6 +344,10 @@ public class AppSchedulable extends Sche
 
         ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
             ResourceRequest.ANY);
+        if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
+          continue;
+        }
+        
         if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
             && allowedLocality.equals(NodeType.OFF_SWITCH)) {
           return assignContainer(node, priority, offSwitchRequest,
@@ -359,10 +372,23 @@ public class AppSchedulable extends Sche
    * given node, if the node had full space.
    */
   public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
-    // TODO: add checks stuff about node specific scheduling here
-    ResourceRequest request = app.getResourceRequest(prio, 
ResourceRequest.ANY);
-    return request.getNumContainers() > 0 && 
+    ResourceRequest anyRequest = app.getResourceRequest(prio, 
ResourceRequest.ANY);
+    ResourceRequest rackRequest = app.getResourceRequest(prio, 
node.getRackName());
+    ResourceRequest nodeRequest = app.getResourceRequest(prio, 
node.getHostName());
+
+    return
+        // There must be outstanding requests at the given priority:
+        anyRequest != null && anyRequest.getNumContainers() > 0 &&
+        // If locality relaxation is turned off at *-level, there must be a
+        // non-zero request for the node's rack:
+        (anyRequest.getRelaxLocality() ||
+            (rackRequest != null && rackRequest.getNumContainers() > 0)) &&
+        // If locality relaxation is turned off at rack-level, there must be a
+        // non-zero request at the node:
+        (rackRequest == null || rackRequest.getRelaxLocality() ||
+            (nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
+        // The requested container must be able to fit on the node:
         Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
-            request.getCapability(), node.getRMNode().getTotalCapability());
+            anyRequest.getCapability(), node.getRMNode().getTotalCapability());
   }
 }

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1488326&r1=1488325&r2=1488326&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 Fri May 31 17:26:18 2013
@@ -151,7 +151,7 @@ public class TestFairScheduler {
 
 
   private ResourceRequest createResourceRequest(int memory, String host,
-      int priority, int numContainers) {
+      int priority, int numContainers, boolean relaxLocality) {
     ResourceRequest request = 
recordFactory.newRecordInstance(ResourceRequest.class);
     request.setCapability(Resources.createResource(memory));
     request.setHostName(host);
@@ -159,6 +159,7 @@ public class TestFairScheduler {
     Priority prio = recordFactory.newRecordInstance(Priority.class);
     prio.setPriority(priority);
     request.setPriority(prio);
+    request.setRelaxLocality(relaxLocality);
     return request;
   }
 
@@ -182,7 +183,7 @@ public class TestFairScheduler {
     scheduler.addApplication(id, queueId, userId);
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request = createResourceRequest(memory, 
ResourceRequest.ANY,
-        priority, numContainers);
+        priority, numContainers, true);
     ask.add(request);
     scheduler.allocate(id, ask,  new ArrayList<ContainerId>());
     return id;
@@ -190,9 +191,14 @@ public class TestFairScheduler {
   
   private void createSchedulingRequestExistingApplication(int memory, int 
priority,
       ApplicationAttemptId attId) {
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request = createResourceRequest(memory, 
ResourceRequest.ANY,
-        priority, 1);
+        priority, 1, true);
+    createSchedulingRequestExistingApplication(request, attId);
+  }
+  
+  private void createSchedulingRequestExistingApplication(ResourceRequest 
request,
+      ApplicationAttemptId attId) {
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ask.add(request);
     scheduler.allocate(attId, ask,  new ArrayList<ContainerId>());
   }
@@ -499,14 +505,16 @@ public class TestFairScheduler {
     // First ask, queue1 requests 1 large (minReqSize * 2).
     List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
     ResourceRequest request1 =
-        createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1);
+        createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true);
     ask1.add(request1);
     scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
 
     // Second ask, queue2 requests 1 large + (2 * minReqSize)
     List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
-    ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 
1);
-    ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2);
+    ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 
1,
+        false);
+    ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2,
+        false);
     ask2.add(request2);
     ask2.add(request3);
     scheduler.allocate(id21, ask2, new ArrayList<ContainerId>());
@@ -514,7 +522,7 @@ public class TestFairScheduler {
     // Third ask, queue2 requests 1 large
     List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
     ResourceRequest request4 =
-        createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1);
+        createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1, true);
     ask3.add(request4);
     scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
 
@@ -1408,12 +1416,12 @@ public class TestFairScheduler {
     // 1 request with 2 nodes on the same rack. another request with 1 node on
     // a different rack
     List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
-    asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1));
-    asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1));
-    asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1));
-    asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1));
-    asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1));
-    asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2));
+    asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1, true));
+    asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1, true));
+    asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1, true));
+    asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1, true));
+    asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
+    asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
 
     scheduler.allocate(appId, asks, new ArrayList<ContainerId>());
     
@@ -1693,5 +1701,129 @@ public class TestFairScheduler {
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
     scheduler.update(); // update shouldn't change things
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+}
+
+  @Test
+  public void testStrictLocality() {
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, 
"127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, 
"127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "user1", 0);
+    
+    ResourceRequest nodeRequest = createResourceRequest(1024, 
node1.getHostName(), 1, 1, true);
+    ResourceRequest rackRequest = createResourceRequest(1024, 
node1.getRackName(), 1, 1, false);
+    ResourceRequest anyRequest = createResourceRequest(1024, 
ResourceRequest.ANY,
+        1, 1, false);
+    createSchedulingRequestExistingApplication(nodeRequest, attId1);
+    createSchedulingRequestExistingApplication(rackRequest, attId1);
+    createSchedulingRequestExistingApplication(anyRequest, attId1);
+
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent node1UpdateEvent = new 
NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent node2UpdateEvent = new 
NodeUpdateSchedulerEvent(node2);
+
+    // no matter how many heartbeats, node2 should never get a container
+    FSSchedulerApp app = scheduler.applications.get(attId1);
+    for (int i = 0; i < 10; i++) {
+      scheduler.handle(node2UpdateEvent);
+      assertEquals(0, app.getLiveContainers().size());
+      assertEquals(0, app.getReservedContainers().size());
+    }
+    // then node1 should get the container
+    scheduler.handle(node1UpdateEvent);
+    assertEquals(1, app.getLiveContainers().size());
+  }
+  
+  @Test
+  public void testCancelStrictLocality() {
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, 
"127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, 
"127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "user1", 0);
+    
+    ResourceRequest nodeRequest = createResourceRequest(1024, 
node1.getHostName(), 1, 1, true);
+    ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 1, 
false);
+    ResourceRequest anyRequest = createResourceRequest(1024, 
ResourceRequest.ANY,
+        1, 1, false);
+    createSchedulingRequestExistingApplication(nodeRequest, attId1);
+    createSchedulingRequestExistingApplication(rackRequest, attId1);
+    createSchedulingRequestExistingApplication(anyRequest, attId1);
+
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent node2UpdateEvent = new 
NodeUpdateSchedulerEvent(node2);
+
+    // no matter how many heartbeats, node2 should never get a container
+    FSSchedulerApp app = scheduler.applications.get(attId1);
+    for (int i = 0; i < 10; i++) {
+      scheduler.handle(node2UpdateEvent);
+      assertEquals(0, app.getLiveContainers().size());
+    }
+    
+    // relax locality
+    List<ResourceRequest> update = Arrays.asList(
+        createResourceRequest(1024, node1.getHostName(), 1, 0, true),
+        createResourceRequest(1024, "rack1", 1, 0, true),
+        createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
+    scheduler.allocate(attId1, update, new ArrayList<ContainerId>());
+    
+    // then node2 should get the container
+    scheduler.handle(node2UpdateEvent);
+    assertEquals(1, app.getLiveContainers().size());
+  }
+  
+  /**
+   * If we update our ask to strictly request a node, it doesn't make sense to 
keep
+   * a reservation on another.
+   */
+  @Test
+  public void testReservationsStrictLocality() {
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, 
"127.0.0.1");
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, 
"127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent2);
+
+    ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
+        "user1", 0);
+    FSSchedulerApp app = scheduler.applications.get(attId);
+    
+    ResourceRequest nodeRequest = createResourceRequest(1024, 
node2.getHostName(), 1, 2, true);
+    ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, 
true);
+    ResourceRequest anyRequest = createResourceRequest(1024, 
ResourceRequest.ANY,
+        1, 2, false);
+    createSchedulingRequestExistingApplication(nodeRequest, attId);
+    createSchedulingRequestExistingApplication(rackRequest, attId);
+    createSchedulingRequestExistingApplication(anyRequest, attId);
+    
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent nodeUpdateEvent = new 
NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(nodeUpdateEvent);
+    assertEquals(1, app.getLiveContainers().size());
+    scheduler.handle(nodeUpdateEvent);
+    assertEquals(1, app.getReservedContainers().size());
+    
+    // now, make our request node-specific (on a different node)
+    rackRequest = createResourceRequest(1024, "rack1", 1, 1, false);
+    anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
+        1, 1, false);
+    scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
+        new ArrayList<ContainerId>());
+
+    scheduler.handle(nodeUpdateEvent);
+    assertEquals(0, app.getReservedContainers().size());
   }
 }


Reply via email to