Author: tucu
Date: Thu Oct 31 03:05:05 2013
New Revision: 1537370

URL: http://svn.apache.org/r1537370
Log:
YARN-1343. NodeManagers additions/restarts are not reported as node updates in 
AllocateResponse responses to AMs. (tucu)

Added:
    
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
      - copied unchanged from r1537368, 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
Modified:
    hadoop/common/branches/branch-2.2/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
    
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

Modified: hadoop/common/branches/branch-2.2/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-yarn-project/CHANGES.txt?rev=1537370&r1=1537369&r2=1537370&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.2/hadoop-yarn-project/CHANGES.txt Thu Oct 
31 03:05:05 2013
@@ -71,6 +71,9 @@ Release 2.2.1 - UNRELEASED
     YARN-1358. TestYarnCLI fails on Windows due to line endings. (Chuan Liu via
     cnauroth)
 
+    YARN-1343. NodeManagers additions/restarts are not reported as node 
updates 
+    in AllocateResponse responses to AMs. (tucu)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java?rev=1537370&r1=1537369&r2=1537370&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
 (original)
+++ 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
 Thu Oct 31 03:05:05 2013
@@ -160,17 +160,14 @@ public class NodesListManager extends Ab
       if (unusableRMNodesConcurrentSet.contains(eventNode)) {
         LOG.debug(eventNode + " reported usable");
         unusableRMNodesConcurrentSet.remove(eventNode);
-        for (RMApp app : rmContext.getRMApps().values()) {
-          this.rmContext
-              .getDispatcher()
-              .getEventHandler()
-              .handle(
-                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                      RMAppNodeUpdateType.NODE_USABLE));
-        }
-      } else {
-        LOG.warn(eventNode
-            + " reported usable without first reporting unusable");
+      }
+      for (RMApp app : rmContext.getRMApps().values()) {
+        this.rmContext
+            .getDispatcher()
+            .getEventHandler()
+            .handle(
+                new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+                    RMAppNodeUpdateType.NODE_USABLE));
       }
       break;
     default:

Modified: 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1537370&r1=1537369&r2=1537370&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 (original)
+++ 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 Thu Oct 31 03:05:05 2013
@@ -431,7 +431,10 @@ public class RMNodeImpl implements RMNod
 
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeAddedSchedulerEvent(rmNode));
-      
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodesListManagerEvent(
+              NodesListManagerEventType.NODE_USABLE, rmNode));
+ 
       String host = rmNode.nodeId.getHost();
       if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
         // Old node rejoining
@@ -464,7 +467,7 @@ public class RMNodeImpl implements RMNod
           // Only add new node if old state is not UNHEALTHY
           rmNode.context.getDispatcher().getEventHandler().handle(
               new NodeAddedSchedulerEvent(rmNode));
-         }
+        }
       } else {
         // Reconnected node differs, so replace old node and start new node
         switch (rmNode.getState()) {
@@ -479,6 +482,9 @@ public class RMNodeImpl implements RMNod
         rmNode.context.getDispatcher().getEventHandler().handle(
             new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
       }
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodesListManagerEvent(
+              NodesListManagerEventType.NODE_USABLE, rmNode));
     }
   }
 

Modified: 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1537370&r1=1537369&r2=1537370&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
 (original)
+++ 
hadoop/common/branches/branch-2.2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
 Thu Oct 31 03:05:05 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -79,6 +81,18 @@ public class TestRMNodeTransitions {
     }
   }
 
+  private NodesListManagerEvent nodesListManagerEvent = null;
+  
+  private class TestNodeListManagerEventDispatcher implements
+      EventHandler<NodesListManagerEvent> {
+    
+    @Override
+    public void handle(NodesListManagerEvent event) {
+      nodesListManagerEvent = event;
+    }
+
+  }
+
   @Before
   public void setUp() throws Exception {
     InlineDispatcher rmDispatcher = new InlineDispatcher();
@@ -109,8 +123,12 @@ public class TestRMNodeTransitions {
     rmDispatcher.register(SchedulerEventType.class, 
         new TestSchedulerEventDispatcher());
     
+    rmDispatcher.register(NodesListManagerEventType.class,
+        new TestNodeListManagerEventDispatcher());
+
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
+    nodesListManagerEvent =  null;
 
   }
   
@@ -389,8 +407,9 @@ public class TestRMNodeTransitions {
 
   private RMNodeImpl getRunningNode() {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+    Resource capability = Resource.newInstance(4096, 4);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
-        null, null);
+        null, capability);
     node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;
@@ -405,4 +424,60 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
     return node;
   }
+
+
+  private RMNodeImpl getNewNode() {
+    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, 
null);
+    return node;
+  }
+
+  @Test
+  public void testAdd() {
+    RMNodeImpl node = getNewNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
+    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    Assert.assertEquals("Active Nodes", initialActive + 1, 
cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.RUNNING, node.getState());
+    Assert.assertNotNull(nodesListManagerEvent);
+    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, 
+        nodesListManagerEvent.getType());
+  }
+
+  @Test
+  public void testReconnect() {
+    RMNodeImpl node = getRunningNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node));
+    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.RUNNING, node.getState());
+    Assert.assertNotNull(nodesListManagerEvent);
+    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+        nodesListManagerEvent.getType());
+  }
+
 }


Reply via email to