YARN-4344. NMs reconnecting with changed capabilities can lead to wrong cluster 
resource calculations. Contributed by Varun Vasudev


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d36b6e04
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d36b6e04
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d36b6e04

Branch: refs/heads/yarn-2877
Commit: d36b6e045f317c94e97cb41a163aa974d161a404
Parents: 298a8cb
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Nov 23 20:30:26 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Nov 23 20:30:26 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   6 +
 .../scheduler/capacity/CapacityScheduler.java   |   6 +-
 .../scheduler/fifo/FifoScheduler.java           |   4 +-
 .../capacity/TestCapacityScheduler.java         | 114 ++++++++++++++++++-
 4 files changed, 124 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d36b6e04/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 29eca0c..7a59387 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1082,6 +1082,9 @@ Release 2.7.3 - UNRELEASED
     YARN-3769. Consider user limit when calculating total pending resource for 
     preemption policy in Capacity Scheduler. (Eric Payne via wangda)
 
+    YARN-4344. NMs reconnecting with changed capabilities can lead to wrong
+    cluster resource calculations (Varun Vasudev via jlowe)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -1936,6 +1939,9 @@ Release 2.6.3 - UNRELEASED
     YARN-2859. ApplicationHistoryServer binds to default port 8188 in 
MiniYARNCluster.
     (Vinod Kumar Vavilapalli via xgong)
 
+    YARN-4344. NMs reconnecting with changed capabilities can lead to wrong
+    cluster resource calculations (Varun Vasudev via jlowe)
+
 Release 2.6.2 - 2015-10-28
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d36b6e04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 1075ee0..00d3ab4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1411,12 +1411,12 @@ public class CapacityScheduler extends
     FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
         usePortForNodeName, nodeManager.getNodeLabels());
     this.nodes.put(nodeManager.getNodeID(), schedulerNode);
-    Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
 
     // update this node to node label manager
     if (labelManager != null) {
       labelManager.activateNode(nodeManager.getNodeID(),
-          nodeManager.getTotalCapability());
+          schedulerNode.getTotalResource());
     }
     
     root.updateClusterResource(clusterResource, new ResourceLimits(
@@ -1442,7 +1442,7 @@ public class CapacityScheduler extends
     if (node == null) {
       return;
     }
-    Resources.subtractFrom(clusterResource, 
node.getRMNode().getTotalCapability());
+    Resources.subtractFrom(clusterResource, node.getTotalResource());
     root.updateClusterResource(clusterResource, new ResourceLimits(
         clusterResource));
     int numNodes = numNodeManagers.decrementAndGet();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d36b6e04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 2ec2311..5999eb7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -939,7 +939,7 @@ public class FifoScheduler extends
     updateMaximumAllocation(node, false);
     
     // Update cluster metrics
-    Resources.subtractFrom(clusterResource, 
node.getRMNode().getTotalCapability());
+    Resources.subtractFrom(clusterResource, node.getTotalResource());
   }
 
   @Override
@@ -962,7 +962,7 @@ public class FifoScheduler extends
     FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
         usePortForNodeName);
     this.nodes.put(nodeManager.getNodeID(), schedulerNode);
-    Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
     updateMaximumAllocation(schedulerNode, true);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d36b6e04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index f0a1d03..7c95cdc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -71,11 +72,15 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
@@ -83,10 +88,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import 
org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
 import 
org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
@@ -106,6 +114,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -124,6 +133,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -144,7 +154,7 @@ import org.junit.Test;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
-
+import org.mockito.Mockito;
 
 public class TestCapacityScheduler {
   private static final Log LOG = 
LogFactory.getLog(TestCapacityScheduler.class);
@@ -3248,4 +3258,106 @@ public class TestCapacityScheduler {
       Assert.fail("Cannot find RMContainer");
     }
   }
+
+  private class SleepHandler implements EventHandler<SchedulerEvent> {
+    boolean sleepFlag = false;
+    int sleepTime = 20;
+    @Override
+    public void handle(SchedulerEvent event) {
+      try {
+        if(sleepFlag) {
+          Thread.sleep(sleepTime);
+        }
+      }
+      catch(InterruptedException ie) {
+      }
+    }
+  }
+
+  private ResourceTrackerService getPrivateResourceTrackerService(
+      Dispatcher privateDispatcher, SleepHandler sleepHandler) {
+
+    Configuration conf = new Configuration();
+    ResourceTrackerService privateResourceTrackerService;
+
+    RMContext privateContext =
+        new RMContextImpl(privateDispatcher, null, null, null, null, null, 
null,
+            null, null, null);
+    
privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
+
+    privateDispatcher.register(SchedulerEventType.class, sleepHandler);
+    privateDispatcher.register(SchedulerEventType.class,
+        resourceManager.getResourceScheduler());
+    privateDispatcher.register(RMNodeEventType.class,
+        new ResourceManager.NodeEventDispatcher(privateContext));
+    ((Service) privateDispatcher).init(conf);
+    ((Service) privateDispatcher).start();
+    NMLivelinessMonitor nmLivelinessMonitor =
+        new NMLivelinessMonitor(privateDispatcher);
+    nmLivelinessMonitor.init(conf);
+    nmLivelinessMonitor.start();
+    NodesListManager nodesListManager = new NodesListManager(privateContext);
+    nodesListManager.init(conf);
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        new RMContainerTokenSecretManager(conf);
+    containerTokenSecretManager.start();
+    NMTokenSecretManagerInRM nmTokenSecretManager =
+        new NMTokenSecretManagerInRM(conf);
+    nmTokenSecretManager.start();
+    privateResourceTrackerService =
+        new ResourceTrackerService(privateContext, nodesListManager,
+            nmLivelinessMonitor, containerTokenSecretManager,
+            nmTokenSecretManager);
+    privateResourceTrackerService.init(conf);
+    privateResourceTrackerService.start();
+    resourceManager.getResourceScheduler().setRMContext(privateContext);
+    return privateResourceTrackerService;
+  }
+
+  /**
+   * Test the behaviour of the capacity scheduler when a node reconnects
+   * with changed capabilities. This test is to catch any race conditions
+   * that might occur due to the use of the RMNode object.
+   * @throws Exception
+   */
+  @Test
+  public void testNodemanagerReconnect() throws Exception {
+
+    DrainDispatcher privateDispatcher = new DrainDispatcher();
+    SleepHandler sleepHandler = new SleepHandler();
+    ResourceTrackerService privateResourceTrackerService =
+        getPrivateResourceTrackerService(privateDispatcher,
+            sleepHandler);
+
+    // Register node1
+    String hostname1 = "localhost1";
+    Resource capability = BuilderUtils.newResource(4096, 4);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+    RegisterNodeManagerRequest request1 =
+        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+    NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+    request1.setNodeId(nodeId1);
+    request1.setHttpPort(0);
+    request1.setResource(capability);
+    privateResourceTrackerService.registerNodeManager(request1);
+    privateDispatcher.await();
+    Resource clusterResource = 
resourceManager.getResourceScheduler().getClusterResource();
+    Assert.assertEquals("Initial cluster resources don't match", capability,
+        clusterResource);
+
+    Resource newCapability = BuilderUtils.newResource(1024, 1);
+    RegisterNodeManagerRequest request2 =
+        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+    request2.setNodeId(nodeId1);
+    request2.setHttpPort(0);
+    request2.setResource(newCapability);
+    // hold up the disaptcher and register the same node with lower capability
+    sleepHandler.sleepFlag = true;
+    privateResourceTrackerService.registerNodeManager(request2);
+    privateDispatcher.await();
+    Assert.assertEquals("Cluster resources don't match", newCapability,
+        resourceManager.getResourceScheduler().getClusterResource());
+    privateResourceTrackerService.stop();
+  }
 }

Reply via email to