Author: bikas
Date: Thu Oct  3 21:14:35 2013
New Revision: 1529005

URL: http://svn.apache.org/r1529005
Log:
MAPREDUCE-5489. MR jobs hangs as it does not use the node-blacklisting feature 
in RM requests (Zhijie Shen via bikas)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1529005&r1=1529004&r2=1529005&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Oct  3 
21:14:35 2013
@@ -272,6 +272,9 @@ Release 2.1.2 - UNRELEASED
     tests jar is breaking tests for downstream components (Robert Kanter via
     Sandy Ryza)
 
+    MAPREDUCE-5489. MR jobs hangs as it does not use the node-blacklisting
+    feature in RM requests (Zhijie Shen via bikas)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1529005&r1=1529004&r2=1529005&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
 Thu Oct  3 21:14:35 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -86,6 +87,10 @@ public abstract class RMContainerRequest
   private final Map<String, Integer> nodeFailures = new HashMap<String, 
Integer>();
   private final Set<String> blacklistedNodes = Collections
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+  private final Set<String> blacklistAdditions = Collections
+      .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+  private final Set<String> blacklistRemovals = Collections
+      .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
   public RMContainerRequestor(ClientService clientService, AppContext context) 
{
     super(clientService, context);
@@ -145,10 +150,13 @@ public abstract class RMContainerRequest
   }
 
   protected AllocateResponse makeRemoteRequest() throws IOException {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(new 
ArrayList<String>(blacklistAdditions),
+            new ArrayList<String>(blacklistRemovals));
     AllocateRequest allocateRequest =
         AllocateRequest.newInstance(lastResponseID,
           super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
-          new ArrayList<ContainerId>(release), null);
+          new ArrayList<ContainerId>(release), blacklistRequest);
     AllocateResponse allocateResponse;
     try {
       allocateResponse = scheduler.allocate(allocateRequest);
@@ -172,6 +180,14 @@ public abstract class RMContainerRequest
 
     ask.clear();
     release.clear();
+
+    if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
+      LOG.info("Update the blacklist for " + applicationId +
+          ": blacklistAdditions=" + blacklistAdditions.size() +
+          " blacklistRemovals=" +  blacklistRemovals.size());
+    }
+    blacklistAdditions.clear();
+    blacklistRemovals.clear();
     return allocateResponse;
   }
 
@@ -195,11 +211,17 @@ public abstract class RMContainerRequest
         if (ignoreBlacklisting.compareAndSet(false, true)) {
           LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
               + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
+          // notify RM to ignore all the blacklisted nodes
+          blacklistAdditions.clear();
+          blacklistRemovals.addAll(blacklistedNodes);
         }
       } else {
         if (ignoreBlacklisting.compareAndSet(true, false)) {
           LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
               + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
+          // notify RM of all the blacklisted nodes
+          blacklistAdditions.addAll(blacklistedNodes);
+          blacklistRemovals.clear();
         }
       }
     }
@@ -221,6 +243,9 @@ public abstract class RMContainerRequest
     LOG.info(failures + " failures on node " + hostName);
     if (failures >= maxTaskFailuresPerNode) {
       blacklistedNodes.add(hostName);
+      if (!ignoreBlacklisting.get()) {
+        blacklistAdditions.add(hostName);
+      }
       //Even if blacklisting is ignored, continue to remove the host from
       // the request table. The RM may have additional nodes it can allocate 
on.
       LOG.info("Blacklisted host " + hostName);

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1529005&r1=1529004&r2=1529005&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
 Thu Oct  3 21:14:35 2013
@@ -880,8 +880,10 @@ public class TestRMContainerAllocator {
     dispatcher.await();
 
     assigned = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     dispatcher.await();
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+    assertBlacklistAdditionsAndRemovals(2, 0, rm);
 
     // mark h1/h2 as bad nodes
     nodeManager1.nodeHeartbeat(false);
@@ -890,12 +892,14 @@ public class TestRMContainerAllocator {
 
     assigned = allocator.schedule();
     dispatcher.await();
+    assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
 
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
     dispatcher.await();
-    assigned = allocator.schedule();    
+    assigned = allocator.schedule();
     dispatcher.await();
+    assertBlacklistAdditionsAndRemovals(0, 0, rm);
         
     Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
     
@@ -948,7 +952,7 @@ public class TestRMContainerAllocator {
     // Known=1, blacklisted=0, ignore should be false - assign first container
     assigned =
         getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator);
+            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
@@ -958,44 +962,52 @@ public class TestRMContainerAllocator {
     allocator.sendFailure(f1);
 
     // Test single node.
+    // Known=1, blacklisted=1, ignore should be true - assign 0
+    // Because makeRemoteRequest will not be aware of it until next call
+    // The current call will send blacklisted node "h1" to RM
+    assigned =
+        getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+            nodeManagers[0], dispatcher, allocator, 1, 0, 0, 1, rm);
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
     // Known=1, blacklisted=1, ignore should be true - assign 1
     assigned =
         getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator);
+            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
     // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
         getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
-            nodeManagers[1], dispatcher, allocator);
+            nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
     // Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
         getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator);
+            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Known=3, blacklisted=1, ignore should be true - assign 1
     assigned =
         getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator);
+            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
     // Known=4, blacklisted=1, ignore should be false - assign 1 anyway
     assigned =
         getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
-            nodeManagers[3], dispatcher, allocator);
+            nodeManagers[3], dispatcher, allocator, 0, 0, 1, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Test blacklisting re-enabled.
     // Known=4, blacklisted=1, ignore should be false - no assignment on h1
     assigned =
         getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator);
+            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     // RMContainerRequestor would have created a replacement request.
 
@@ -1004,17 +1016,24 @@ public class TestRMContainerAllocator {
     allocator.sendFailure(f2);
 
     // Test ignore blacklisting re-enabled
+    // Known=4, blacklisted=2, ignore should be true. Should assign 0
+    // container for the same reason above.
+    assigned =
+        getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+            nodeManagers[0], dispatcher, allocator, 1, 0, 0, 2, rm);
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
     // Known=4, blacklisted=2, ignore should be true. Should assign 2
     // containers.
     assigned =
         getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator);
+            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true.
     assigned =
         getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
-            nodeManagers[1], dispatcher, allocator);
+            nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Test blacklist while ignore blacklisting enabled
@@ -1025,7 +1044,7 @@ public class TestRMContainerAllocator {
     // Known=5, blacklisted=3, ignore should be true.
     assigned =
         getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator);
+            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
     
     // Assign on 5 more nodes - to re-enable blacklisting
@@ -1034,14 +1053,14 @@ public class TestRMContainerAllocator {
       assigned =
           getContainerOnHost(jobId, 11 + i, 1024,
               new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
-              dispatcher, allocator);
+              dispatcher, allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
       Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
     }
 
     // Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
     assigned =
         getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator);
+            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
   }
 
@@ -1055,7 +1074,9 @@ public class TestRMContainerAllocator {
   private
       List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
           int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
-          DrainDispatcher dispatcher, MyContainerAllocator allocator)
+          DrainDispatcher dispatcher, MyContainerAllocator allocator,
+          int expectedAdditions1, int expectedRemovals1,
+          int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
           throws Exception {
     ContainerRequestEvent reqEvent =
         createReq(jobId, taskAttemptId, memory, hosts);
@@ -1064,6 +1085,8 @@ public class TestRMContainerAllocator {
     // Send the request to the RM
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
     dispatcher.await();
+    assertBlacklistAdditionsAndRemovals(
+        expectedAdditions1, expectedRemovals1, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Heartbeat from the required nodeManager
@@ -1072,6 +1095,8 @@ public class TestRMContainerAllocator {
 
     assigned = allocator.schedule();
     dispatcher.await();
+    assertBlacklistAdditionsAndRemovals(
+        expectedAdditions2, expectedRemovals2, rm);
     return assigned;
   }
  
@@ -1137,6 +1162,7 @@ public class TestRMContainerAllocator {
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
     dispatcher.await();
+    assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());    
     
     LOG.info("Failing container _1 on H1 (should blacklist the node)");
@@ -1153,6 +1179,7 @@ public class TestRMContainerAllocator {
     //Update the Scheduler with the new requests.
     assigned = allocator.schedule();
     dispatcher.await();
+    assertBlacklistAdditionsAndRemovals(1, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // send another request with different resource and priority
@@ -1171,6 +1198,7 @@ public class TestRMContainerAllocator {
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
     dispatcher.await();
+    assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
     
     //RMContainerAllocator gets assigned a p:5 on a blacklisted node.
@@ -1179,6 +1207,7 @@ public class TestRMContainerAllocator {
     LOG.info("RM Heartbeat (To process the re-scheduled containers)");
     assigned = allocator.schedule();
     dispatcher.await();
+    assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     
     //Hearbeat from H3 to schedule on this host.
@@ -1188,6 +1217,7 @@ public class TestRMContainerAllocator {
     
     LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
     assigned = allocator.schedule();
+    assertBlacklistAdditionsAndRemovals(0, 0, rm);
     dispatcher.await();
      
     // For debugging
@@ -1205,7 +1235,15 @@ public class TestRMContainerAllocator {
           + " host not correct", "h3", 
assig.getContainer().getNodeId().getHost());
     }
   }
-  
+
+  private static void assertBlacklistAdditionsAndRemovals(
+      int expectedAdditions, int expectedRemovals, MyResourceManager rm) {
+    Assert.assertEquals(expectedAdditions,
+        rm.getMyFifoScheduler().lastBlacklistAdditions.size());
+    Assert.assertEquals(expectedRemovals,
+        rm.getMyFifoScheduler().lastBlacklistRemovals.size());
+  }
+
   private static class MyFifoScheduler extends FifoScheduler {
 
     public MyFifoScheduler(RMContext rmContext) {
@@ -1220,6 +1258,8 @@ public class TestRMContainerAllocator {
     }
     
     List<ResourceRequest> lastAsk = null;
+    List<String> lastBlacklistAdditions;
+    List<String> lastBlacklistRemovals;
     
     // override this to copy the objects otherwise FifoScheduler updates the
     // numContainers in same objects as kept by RMContainerAllocator
@@ -1236,6 +1276,8 @@ public class TestRMContainerAllocator {
         askCopy.add(reqCopy);
       }
       lastAsk = ask;
+      lastBlacklistAdditions = blacklistAdditions;
+      lastBlacklistRemovals = blacklistRemovals;
       return super.allocate(
           applicationAttemptId, askCopy, release, 
           blacklistAdditions, blacklistRemovals);


Reply via email to