Author: llu
Date: Sat Jun 29 20:22:19 2013
New Revision: 1498024

URL: http://svn.apache.org/r1498024
Log:
YARN-877. Support resource blacklisting for FifoScheduler. (Junping Du via llu)

Added:
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1498024&r1=1498023&r2=1498024&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Jun 29 
20:22:19 2013
@@ -391,6 +391,9 @@ Release 2.1.0-beta - 2013-07-02
     YARN-750. Allow for black-listing resources in YARN API and Impl in CS
     (acmurthy via bikas)
 
+    YARN-877. Support resource blacklisting for FifoScheduler.
+    (Junping Du via llu)
+
     YARN-686. Flatten NodeReport. (sandyr via tucu)
 
     YARN-737. Throw some specific exceptions directly instead of wrapping them

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1498024&r1=1498023&r2=1498024&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 Sat Jun 29 20:22:19 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
@@ -816,7 +817,7 @@ public class LeafQueue implements CSQueu
 
       synchronized (application) {
         // Check if this resource is on the blacklist
-        if (isBlacklisted(application, node)) {
+        if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
           continue;
         }
         
@@ -902,28 +903,6 @@ public class LeafQueue implements CSQueu
     return NULL_ASSIGNMENT;
 
   }
-  
-  boolean isBlacklisted(FiCaSchedulerApp application, FiCaSchedulerNode node) {
-    if (application.isBlacklisted(node.getHostName())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping 'host' " + node.getHostName() + 
-            " for " + application.getApplicationId() + 
-            " since it has been blacklisted");
-      }
-      return true;
-    }
-
-    if (application.isBlacklisted(node.getRackName())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping 'rack' " + node.getRackName() + 
-            " for " + application.getApplicationId() + 
-            " since it has been blacklisted");
-      }
-      return true;
-    }
-
-    return false;
-  }
 
   private synchronized CSAssignment 
   assignReservedContainer(FiCaSchedulerApp application, 

Added: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java?rev=1498024&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
 (added)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
 Sat Jun 29 20:22:19 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
+
+import org.apache.commons.logging.Log;
+
+public class FiCaSchedulerUtils {
+
+  public static  boolean isBlacklisted(FiCaSchedulerApp application,
+      FiCaSchedulerNode node, Log LOG) {
+    if (application.isBlacklisted(node.getHostName())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping 'host' " + node.getHostName() + 
+            " for " + application.getApplicationId() + 
+            " since it has been blacklisted");
+      }
+      return true;
+    }
+
+    if (application.isBlacklisted(node.getRackName())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping 'rack' " + node.getRackName() + 
+            " for " + application.getApplicationId() + 
+            " since it has been blacklisted");
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+}

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1498024&r1=1498023&r2=1498024&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
 Sat Jun 29 20:22:19 2013
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@@ -290,7 +291,7 @@ public class FifoScheduler implements Re
         application.showRequests();
 
         // Update application requests
-        application.updateResourceRequests(ask, null, null);
+        application.updateResourceRequests(ask, blacklistAdditions, 
blacklistRemovals);
 
         LOG.debug("allocate: post-update" +
             " applicationId=" + applicationAttemptId + 
@@ -388,6 +389,11 @@ public class FifoScheduler implements Re
       LOG.debug("pre-assignContainers");
       application.showRequests();
       synchronized (application) {
+        // Check if this resource is on the blacklist
+        if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
+          continue;
+        }
+        
         for (Priority priority : application.getPriorities()) {
           int maxContainers = 
             getMaxAllocatableContainers(application, priority, node, 

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1498024&r1=1498023&r2=1498024&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
 Sat Jun 29 20:22:19 2013
@@ -19,13 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -243,7 +242,6 @@ public class TestFifoScheduler {
 
     fs.handle(new NodeAddedSchedulerEvent(n1));
     fs.handle(new NodeAddedSchedulerEvent(n2));
-    List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
     fs.handle(new NodeUpdateSchedulerEvent(n1));
     Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
 
@@ -258,6 +256,120 @@ public class TestFifoScheduler {
   }
   
   @Test (timeout = 50000)
+  public void testBlackListNodes() throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler();
+
+    int rack_num_0 = 0;
+    int rack_num_1 = 1;
+    // Add 4 nodes in 2 racks
+    
+    // host_0_0 in rack0
+    String host_0_0 = "127.0.0.1";
+    RMNode n1 =
+        MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, 
host_0_0);
+    fs.handle(new NodeAddedSchedulerEvent(n1));
+    
+    // host_0_1 in rack0
+    String host_0_1 = "127.0.0.2";
+    RMNode n2 =
+        MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, 
host_0_1);
+    fs.handle(new NodeAddedSchedulerEvent(n2));
+    
+    // host_1_0 in rack1
+    String host_1_0 = "127.0.0.3";
+    RMNode n3 =
+        MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, 
host_1_0);
+    fs.handle(new NodeAddedSchedulerEvent(n3));
+    
+    // host_1_1 in rack1
+    String host_1_1 = "127.0.0.4";
+    RMNode n4 =
+        MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, 
host_1_1);
+    fs.handle(new NodeAddedSchedulerEvent(n4));
+    
+    // Add one application
+    ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
+        appId1, 1);
+    SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue",
+        "user");
+    fs.handle(event1);
+
+    List<ContainerId> emptyId = new ArrayList<ContainerId>();
+    List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
+
+    // Allow rack-locality for rack_1, but blacklist host_1_0
+    
+    // Set up resource requests
+    // Ask for a 1 GB container for app 1
+    List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
+    ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
+        "rack1", BuilderUtils.newResource(GB, 1), 1));
+    ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
+        ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
+    fs.allocate(appAttemptId1, ask1, emptyId, 
Collections.singletonList(host_1_0), null);
+    
+    // Trigger container assignment
+    fs.handle(new NodeUpdateSchedulerEvent(n3));
+    
+    // Get the allocation for the application and verify no allocation on 
blacklist node
+    Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, 
null, null);
+    
+    Assert.assertEquals("allocation1", 0, allocation1.getContainers().size());
+
+    // verify host_1_1 can get allocated as not in blacklist
+    fs.handle(new NodeUpdateSchedulerEvent(n4));
+    Allocation allocation2 = fs.allocate(appAttemptId1, emptyAsk, emptyId, 
null, null);
+    Assert.assertEquals("allocation2", 1, allocation2.getContainers().size());
+    List<Container> containerList = allocation2.getContainers();
+    for (Container container : containerList) {
+      Assert.assertEquals("Container is allocated on n4",
+          container.getNodeId(), n4.getNodeID());
+    }
+    
+    // Ask for a 1 GB container again for app 1
+    List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
+    // this time, rack0 is also in blacklist, so only host_1_1 is available to
+    // be assigned
+    ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
+        ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
+    fs.allocate(appAttemptId1, ask2, emptyId, 
Collections.singletonList("rack0"), null);
+    
+    // verify n1 is not qualified to be allocated
+    fs.handle(new NodeUpdateSchedulerEvent(n1));
+    Allocation allocation3 = fs.allocate(appAttemptId1, emptyAsk, emptyId, 
null, null);
+    Assert.assertEquals("allocation3", 0, allocation3.getContainers().size());
+    
+    // verify n2 is not qualified to be allocated
+    fs.handle(new NodeUpdateSchedulerEvent(n2));
+    Allocation allocation4 = fs.allocate(appAttemptId1, emptyAsk, emptyId, 
null, null);
+    Assert.assertEquals("allocation4", 0, allocation4.getContainers().size());
+    
+    // verify n3 is not qualified to be allocated
+    fs.handle(new NodeUpdateSchedulerEvent(n3));
+    Allocation allocation5 = fs.allocate(appAttemptId1, emptyAsk, emptyId, 
null, null);
+    Assert.assertEquals("allocation5", 0, allocation5.getContainers().size());
+    
+    fs.handle(new NodeUpdateSchedulerEvent(n4));
+    Allocation allocation6 = fs.allocate(appAttemptId1, emptyAsk, emptyId, 
null, null);
+    Assert.assertEquals("allocation6", 1, allocation6.getContainers().size());
+    
+    containerList = allocation6.getContainers();
+    for (Container container : containerList) {
+      Assert.assertEquals("Container is allocated on n4", 
+          container.getNodeId(), n4.getNodeID());
+    }
+
+    rm.stop();
+  }
+  
+  @Test (timeout = 50000)
   public void testHeadroom() throws Exception {
     
     Configuration conf = new Configuration();
@@ -287,7 +399,6 @@ public class TestFifoScheduler {
         "user");
     fs.handle(event2);
 
-    List<ContainerStatus> emptyStatus = new ArrayList<ContainerStatus>();
     List<ContainerId> emptyId = new ArrayList<ContainerId>();
     List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
 


Reply via email to