Repository: hadoop
Updated Branches:
  refs/heads/trunk 72b047715 -> d0da13229


YARN-4311. Removing nodes from include and exclude lists will not remove them 
from decommissioned nodes list. Contributed by Kuhu Shukla


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d0da1322
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d0da1322
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d0da1322

Branch: refs/heads/trunk
Commit: d0da13229cf692579c8c9db47a93f6c6255392c8
Parents: 72b0477
Author: Jason Lowe <jl...@apache.org>
Authored: Thu May 5 14:07:54 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu May 5 14:07:54 2016 +0000

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   9 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   9 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   9 +
 .../src/main/resources/yarn-default.xml         |  13 +
 .../resourcemanager/NodesListManager.java       | 120 ++++-
 .../server/resourcemanager/RMServerUtils.java   |   2 +-
 .../server/resourcemanager/rmnode/RMNode.java   |   3 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  21 +-
 .../yarn/server/resourcemanager/MockNodes.java  |   9 +
 .../TestResourceTrackerService.java             | 483 ++++++++++++++++++-
 .../webapp/TestRMWebServicesNodes.java          |  12 +-
 11 files changed, 654 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 85096ba..bd737bd 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -204,6 +204,15 @@ public class NodeInfo {
     public ResourceUtilization getNodeUtilization() {
       return null;
     }
+
+    @Override
+    public long getUntrackedTimeStamp() {
+      return 0;
+    }
+
+    @Override
+    public void setUntrackedTimeStamp(long timeStamp) {
+    }
   }
 
   public static RMNode newNodeInfo(String rackName, String hostName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index ab82e66..5048978 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -193,4 +193,13 @@ public class RMNodeWrapper implements RMNode {
   public ResourceUtilization getNodeUtilization() {
     return node.getNodeUtilization();
   }
+
+  @Override
+  public long getUntrackedTimeStamp() {
+    return 0;
+  }
+
+  @Override
+  public void setUntrackedTimeStamp(long timeStamp) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a4213ce..965b6c5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -714,6 +714,15 @@ public class YarnConfiguration extends Configuration {
       "NONE";
 
   /**
+   * Timeout(msec) for an untracked node to remain in shutdown or 
decommissioned
+   * state.
+   */
+  public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC =
+      RM_PREFIX + "node-removal-untracked.timeout-ms";
+  public static final int
+      DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
+
+  /**
    * RM proxy users' prefix
    */
   public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2be402a..a38d0d8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2752,4 +2752,17 @@
     <name>yarn.timeline-service.webapp.rest-csrf.methods-to-ignore</name>
     <value>GET,OPTIONS,HEAD</value>
   </property>
+
+  <property>
+    <description>
+    The least amount of time(msec.) an inactive (decommissioned or shutdown) 
node can
+    stay in the nodes list of the resourcemanager after being declared 
untracked.
+    A node is marked untracked if and only if it is absent from both include 
and
+    exclude nodemanager lists on the RM. All inactive nodes are checked twice 
per
+    timeout interval or every 10 minutes, whichever is lesser, and marked 
appropriately.
+    The same is done when refreshNodes command (graceful or otherwise) is 
invoked.
+    </description>
+    <name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name>
+    <value>60000</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 121c418..bb00e60 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService 
implements
   private String excludesFile;
 
   private Resolver resolver;
+  private Timer removalTimer;
+  private int nodeRemovalCheckInterval;
 
   public NodesListManager(RMContext rmContext) {
     super(NodesListManager.class.getName());
@@ -105,9 +108,72 @@ public class NodesListManager extends CompositeService 
implements
     } catch (IOException ioe) {
       disableHostsFileReader(ioe);
     }
+
+    final int nodeRemovalTimeout =
+        conf.getInt(
+            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+            YarnConfiguration.
+                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+    nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2,
+        600000));
+    removalTimer = new Timer("Node Removal Timer");
+
+    removalTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        long now = Time.monotonicNow();
+        for (Map.Entry<NodeId, RMNode> entry :
+            rmContext.getInactiveRMNodes().entrySet()) {
+          NodeId nodeId = entry.getKey();
+          RMNode rmNode = entry.getValue();
+          if (isUntrackedNode(rmNode.getHostName())) {
+            if (rmNode.getUntrackedTimeStamp() == 0) {
+              rmNode.setUntrackedTimeStamp(now);
+            } else
+              if (now - rmNode.getUntrackedTimeStamp() >
+                  nodeRemovalTimeout) {
+                RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
+                if (result != null) {
+                  decrInactiveNMMetrics(rmNode);
+                  LOG.info("Removed " +result.getState().toString() + " node "
+                      + result.getHostName() + " from inactive nodes list");
+                }
+              }
+          } else {
+            rmNode.setUntrackedTimeStamp(0);
+          }
+        }
+      }
+    }, nodeRemovalCheckInterval, nodeRemovalCheckInterval);
+
     super.serviceInit(conf);
   }
 
+  private void decrInactiveNMMetrics(RMNode rmNode) {
+    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+    switch (rmNode.getState()) {
+    case SHUTDOWN:
+      clusterMetrics.decrNumShutdownNMs();
+      break;
+    case DECOMMISSIONED:
+      clusterMetrics.decrDecommisionedNMs();
+      break;
+    case LOST:
+      clusterMetrics.decrNumLostNMs();
+      break;
+    case REBOOTED:
+      clusterMetrics.decrNumRebootedNMs();
+      break;
+    default:
+      LOG.debug("Unexpected node state");
+    }
+  }
+
+  @Override
+  public void serviceStop() {
+    removalTimer.cancel();
+  }
+
   private void printConfiguredHosts() {
     if (!LOG.isDebugEnabled()) {
       return;
@@ -131,10 +197,13 @@ public class NodesListManager extends CompositeService 
implements
 
     for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
       if (!isValidNode(nodeId.getHost())) {
+        RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+            RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
         this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+            new RMNodeEvent(nodeId, nodeEventType));
       }
     }
+    updateInactiveNodes();
   }
 
   private void refreshHostsReader(Configuration yarnConf) throws IOException,
@@ -172,6 +241,16 @@ public class NodesListManager extends CompositeService 
implements
   }
 
   @VisibleForTesting
+  public int getNodeRemovalCheckInterval() {
+    return nodeRemovalCheckInterval;
+  }
+
+  @VisibleForTesting
+  public void setNodeRemovalCheckInterval(int interval) {
+    this.nodeRemovalCheckInterval = interval;
+  }
+
+  @VisibleForTesting
   public Resolver getResolver() {
     return resolver;
   }
@@ -374,6 +453,33 @@ public class NodesListManager extends CompositeService 
implements
     return hostsReader;
   }
 
+  private void updateInactiveNodes() {
+    long now = Time.monotonicNow();
+    for(Entry<NodeId, RMNode> entry :
+        rmContext.getInactiveRMNodes().entrySet()) {
+      NodeId nodeId = entry.getKey();
+      RMNode rmNode = entry.getValue();
+      if (isUntrackedNode(nodeId.getHost()) &&
+          rmNode.getUntrackedTimeStamp() == 0) {
+        rmNode.setUntrackedTimeStamp(now);
+      }
+    }
+  }
+
+  public boolean isUntrackedNode(String hostName) {
+    boolean untracked;
+    String ip = resolver.resolve(hostName);
+
+    synchronized (hostsReader) {
+      Set<String> hostsList = hostsReader.getHosts();
+      Set<String> excludeList = hostsReader.getExcludedHosts();
+      untracked = !hostsList.isEmpty() &&
+          !hostsList.contains(hostName) && !hostsList.contains(ip) &&
+          !excludeList.contains(hostName) && !excludeList.contains(ip);
+    }
+    return untracked;
+  }
+
   /**
    * Refresh the nodes gracefully
    *
@@ -384,11 +490,13 @@ public class NodesListManager extends CompositeService 
implements
   public void refreshNodesGracefully(Configuration conf) throws IOException,
       YarnException {
     refreshHostsReader(conf);
-    for (Entry<NodeId, RMNode> entry:rmContext.getRMNodes().entrySet()) {
+    for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
       NodeId nodeId = entry.getKey();
       if (!isValidNode(nodeId.getHost())) {
+        RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+            RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
         this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
+            new RMNodeEvent(nodeId, nodeEventType));
       } else {
         // Recommissioning the nodes
         if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
@@ -397,6 +505,7 @@ public class NodesListManager extends CompositeService 
implements
         }
       }
     }
+    updateInactiveNodes();
   }
 
   /**
@@ -420,8 +529,11 @@ public class NodesListManager extends CompositeService 
implements
   public void refreshNodesForcefully() {
     for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
       if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
+        RMNodeEventType nodeEventType =
+            isUntrackedNode(entry.getKey().getHost()) ?
+            RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
         this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
+            new RMNodeEvent(entry.getKey(), nodeEventType));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index e19d55e..1318d58 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -87,7 +87,7 @@ public class RMServerUtils {
         acceptedStates.contains(NodeState.LOST) ||
         acceptedStates.contains(NodeState.REBOOTED)) {
       for (RMNode rmNode : context.getInactiveRMNodes().values()) {
-        if (acceptedStates.contains(rmNode.getState())) {
+        if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
           results.add(rmNode);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 3bf9538..0e281d8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -172,4 +172,7 @@ public interface RMNode {
 
   public QueuedContainersStatus getQueuedContainersStatus();
 
+  long getUntrackedTimeStamp();
+
+  void setUntrackedTimeStamp(long timeStamp);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 3179169..2e3d10f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -121,6 +122,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   private long lastHealthReportTime;
   private String nodeManagerVersion;
 
+  private long timeStamp;
   /* Aggregated resource utilization for the containers. */
   private ResourceUtilization containersUtilization;
   /* Resource utilization for the node. */
@@ -263,6 +265,9 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
 
       .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
           RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
+          RMNodeEventType.SHUTDOWN,
+          new DeactivateNodeTransition(NodeState.SHUTDOWN))
 
       // TODO (in YARN-3223) update resource when container finished.
       .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@@ -350,6 +355,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
     this.healthReport = "Healthy";
     this.lastHealthReportTime = System.currentTimeMillis();
     this.nodeManagerVersion = nodeManagerVersion;
+    this.timeStamp = 0;
 
     this.latestNodeHeartBeatResponse.setResponseId(0);
 
@@ -1015,7 +1021,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   }
 
   /**
-   * Put a node in deactivated (decommissioned) status.
+   * Put a node in deactivated (decommissioned or shutdown) status.
    * @param rmNode
    * @param finalState
    */
@@ -1032,6 +1038,9 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
     LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
         + finalState);
     rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+    if (rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) 
{
+      rmNode.setUntrackedTimeStamp(Time.monotonicNow());
+    }
   }
 
   /**
@@ -1408,4 +1417,14 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
       this.writeLock.unlock();
     }
   }
+
+  @Override
+  public long getUntrackedTimeStamp() {
+    return this.timeStamp;
+  }
+
+  @Override
+  public void setUntrackedTimeStamp(long ts) {
+    this.timeStamp = ts;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index f5b61a3..2b4d2fc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -264,6 +264,15 @@ public class MockNodes {
     public QueuedContainersStatus getQueuedContainersStatus() {
       return null;
     }
+
+    @Override
+    public long getUntrackedTimeStamp() {
+      return 0;
+    }
+
+    @Override
+    public void setUntrackedTimeStamp(long timeStamp) {
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index f2f71ce..cac4511 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -31,6 +31,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
@@ -48,8 +50,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -142,12 +142,12 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
 
     rm.getNodesListManager().refreshNodes(conf);
 
-    checkDecommissionedNMCount(rm, ++metricCount);
+    checkShutdownNMCount(rm, ++metricCount);
 
     nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
     Assert
-        .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+      .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
 
     nodeHeartbeat = nm2.nodeHeartbeat(true);
     Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@@ -156,7 +156,8 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     nodeHeartbeat = nm3.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
     Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
-      .getNumDecommisionedNMs());
+      .getNumShutdownNMs());
+    rm.stop();
   }
 
   /**
@@ -227,7 +228,7 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     MockNM nm2 = rm.registerNode("host2:5678", 10240);
     ClusterMetrics metrics = ClusterMetrics.getMetrics();
     assert(metrics != null);
-    int initialMetricCount = metrics.getNumDecommisionedNMs();
+    int initialMetricCount = metrics.getNumShutdownNMs();
     NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertEquals(
         NodeAction.NORMAL,
@@ -240,16 +241,16 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
         .getAbsolutePath());
     rm.getNodesListManager().refreshNodes(conf);
-    checkDecommissionedNMCount(rm, ++initialMetricCount);
+    checkShutdownNMCount(rm, ++initialMetricCount);
     nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertEquals(
-        "Node should not have been decomissioned.",
+        "Node should not have been shutdown.",
         NodeAction.NORMAL,
         nodeHeartbeat.getNodeAction());
-    nodeHeartbeat = nm2.nodeHeartbeat(true);
-    Assert.assertEquals("Node should have been decomissioned but is in state" +
-        nodeHeartbeat.getNodeAction(),
-        NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
+    NodeState nodeState =
+        rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
+    Assert.assertEquals("Node should have been shutdown but is in state" +
+            nodeState, NodeState.SHUTDOWN, nodeState);
   }
   
   /**
@@ -510,7 +511,8 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
     Assert
         .assertFalse(
-            "Node Labels should not accepted by RM If its configured with 
Central configuration",
+            "Node Labels should not accepted by RM If its configured with " +
+                "Central configuration",
             response.getAreNodeLabelsAcceptedByRM());
     if (rm != null) {
       rm.stop();
@@ -892,15 +894,15 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
 
     // node unhealthy
     nm1.nodeHeartbeat(false);
-    checkUnealthyNMCount(rm, nm1, true, 1);
+    checkUnhealthyNMCount(rm, nm1, true, 1);
 
     // node healthy again
     nm1.nodeHeartbeat(true);
-    checkUnealthyNMCount(rm, nm1, false, 0);
+    checkUnhealthyNMCount(rm, nm1, false, 0);
   }
   
-  private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health,
-      int count) throws Exception {
+  private void checkUnhealthyNMCount(MockRM rm, MockNM nm1, boolean health,
+                                     int count) throws Exception {
     
     int waitCount = 0;
     while((rm.getRMContext().getRMNodes().get(nm1.getNodeId())
@@ -1002,7 +1004,7 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     nm1.nodeHeartbeat(true);
     nm2.nodeHeartbeat(false);
     rm.drainEvents();
-    checkUnealthyNMCount(rm, nm2, true, 1);
+    checkUnhealthyNMCount(rm, nm2, true, 1);
     final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
     QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
     // TODO Metrics incorrect in case of the FifoScheduler
@@ -1014,7 +1016,7 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     rm.drainEvents();
     Assert.assertEquals(expectedNMs, 
ClusterMetrics.getMetrics().getNumActiveNMs());
-    checkUnealthyNMCount(rm, nm2, true, 1);
+    checkUnhealthyNMCount(rm, nm2, true, 1);
 
     // reconnect of unhealthy node
     nm2 = rm.registerNode("host2:5678", 5120);
@@ -1022,7 +1024,7 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     rm.drainEvents();
     Assert.assertEquals(expectedNMs, 
ClusterMetrics.getMetrics().getNumActiveNMs());
-    checkUnealthyNMCount(rm, nm2, true, 1);
+    checkUnhealthyNMCount(rm, nm2, true, 1);
     
     // unhealthy node changed back to healthy
     nm2 = rm.registerNode("host2:5678", 5120);
@@ -1104,7 +1106,7 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
 
     // node unhealthy
     nm1.nodeHeartbeat(false);
-    checkUnealthyNMCount(rm, nm1, true, 1);
+    checkUnhealthyNMCount(rm, nm1, true, 1);
     UnRegisterNodeManagerRequest request = Records
         .newRecord(UnRegisterNodeManagerRequest.class);
     request.setNodeId(nm1.getNodeId());
@@ -1119,8 +1121,6 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     rm.start();
     ResourceTrackerService resourceTrackerService = rm
         .getResourceTrackerService();
-    int shutdownNMsCount = ClusterMetrics.getMetrics()
-        .getNumShutdownNMs();
     int decommisionedNMsCount = ClusterMetrics.getMetrics()
         .getNumDecommisionedNMs();
 
@@ -1145,10 +1145,12 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     rm.getNodesListManager().refreshNodes(conf);
     NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
     Assert.assertEquals(NodeAction.SHUTDOWN, 
heartbeatResponse.getNodeAction());
+    int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
     checkShutdownNMCount(rm, shutdownNMsCount);
-    checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+    checkDecommissionedNMCount(rm, decommisionedNMsCount);
     request.setNodeId(nm1.getNodeId());
     resourceTrackerService.unRegisterNodeManager(request);
+    shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
     checkShutdownNMCount(rm, shutdownNMsCount);
     checkDecommissionedNMCount(rm, decommisionedNMsCount);
 
@@ -1164,8 +1166,9 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     rm.getNodesListManager().refreshNodes(conf);
     request.setNodeId(nm2.getNodeId());
     resourceTrackerService.unRegisterNodeManager(request);
-    checkShutdownNMCount(rm, shutdownNMsCount);
-    checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+    checkShutdownNMCount(rm, ++shutdownNMsCount);
+    checkDecommissionedNMCount(rm, decommisionedNMsCount);
+    rm.stop();
   }
 
   @Test(timeout = 30000)
@@ -1300,6 +1303,434 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     rm.stop();
   }
 
+  /**
+   * Remove a node from all lists and check if its forgotten
+   */
+  @Test
+  public void testNodeRemovalNormally() throws Exception {
+    testNodeRemovalUtil(false);
+    testNodeRemovalUtilLost(false);
+    testNodeRemovalUtilRebooted(false);
+    testNodeRemovalUtilUnhealthy(false);
+  }
+
+  @Test
+  public void testNodeRemovalGracefully() throws Exception {
+    testNodeRemovalUtil(true);
+    testNodeRemovalUtilLost(true);
+    testNodeRemovalUtilRebooted(true);
+    testNodeRemovalUtilUnhealthy(true);
+  }
+
+  public void refreshNodesOption(boolean doGraceful, Configuration conf)
+      throws Exception {
+    if (doGraceful) {
+      rm.getNodesListManager().refreshNodesGracefully(conf);
+    } else {
+      rm.getNodesListManager().refreshNodes(conf);
+    }
+  }
+
+  public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
+    Configuration conf = new Configuration();
+    int timeoutValue = 500;
+    File excludeHostFile = new File(TEMP_DIR + File.separator +
+        "excludeHostFile.txt");
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
+    
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+        timeoutValue);
+    CountDownLatch latch = new CountDownLatch(1);
+    rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+    RMContext rmContext = rm.getRMContext();
+    refreshNodesOption(doGraceful, conf);
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 10240);
+    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+    ClusterMetrics metrics = ClusterMetrics.getMetrics();
+    assert (metrics != null);
+
+    //check all 3 nodes joined in as NORMAL
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    nodeHeartbeat = nm2.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    nodeHeartbeat = nm3.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    rm.drainEvents();
+    Assert.assertEquals("All 3 nodes should be active",
+        metrics.getNumActiveNMs(), 3);
+
+    //Remove nm2 from include list, should now be shutdown with timer test
+    String ip = NetUtils.normalizeHostName("localhost");
+    writeToHostsFile("host1", ip);
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
+        .getAbsolutePath());
+    refreshNodesOption(doGraceful, conf);
+    nm1.nodeHeartbeat(true);
+    rm.drainEvents();
+    Assert.assertTrue("Node should not be in active node list",
+        !rmContext.getRMNodes().containsKey(nm2.getNodeId()));
+
+    RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+    Assert.assertEquals("Node should be in inactive node list",
+        rmNode.getState(), NodeState.SHUTDOWN);
+    Assert.assertEquals("Active nodes should be 2",
+        metrics.getNumActiveNMs(), 2);
+    Assert.assertEquals("Shutdown nodes should be 1",
+        metrics.getNumShutdownNMs(), 1);
+
+    int nodeRemovalTimeout =
+        conf.getInt(
+            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+            YarnConfiguration.
+                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+    int nodeRemovalInterval =
+        rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+    long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+    latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+
+    rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+    Assert.assertEquals("Node should have been forgotten!",
+        rmNode, null);
+    Assert.assertEquals("Shutdown nodes should be 0 now",
+        metrics.getNumShutdownNMs(), 0);
+
+    //Check node removal and re-addition before timer expires
+    writeToHostsFile("host1", ip, "host2");
+    refreshNodesOption(doGraceful, conf);
+    nm2 = rm.registerNode("host2:5678", 10240);
+    rm.drainEvents();
+    writeToHostsFile("host1", ip);
+    refreshNodesOption(doGraceful, conf);
+    rm.drainEvents();
+    rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+    Assert.assertEquals("Node should be shutdown",
+        rmNode.getState(), NodeState.SHUTDOWN);
+    Assert.assertEquals("Active nodes should be 2",
+        metrics.getNumActiveNMs(), 2);
+    Assert.assertEquals("Shutdown nodes should be 1",
+        metrics.getNumShutdownNMs(), 1);
+
+    //add back the node before timer expires
+    latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
+    writeToHostsFile("host1", ip, "host2");
+    refreshNodesOption(doGraceful, conf);
+    nm2 = rm.registerNode("host2:5678", 10240);
+    nodeHeartbeat = nm2.nodeHeartbeat(true);
+    rm.drainEvents();
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    Assert.assertEquals("Shutdown nodes should be 0 now",
+        metrics.getNumShutdownNMs(), 0);
+    Assert.assertEquals("All 3 nodes should be active",
+        metrics.getNumActiveNMs(), 3);
+
+    //Decommission this node, check timer doesn't remove it
+    writeToHostsFile("host1", "host2", ip);
+    writeToHostsFile(excludeHostFile, "host2");
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
+        .getAbsolutePath());
+    refreshNodesOption(doGraceful, conf);
+    rm.drainEvents();
+    rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+             rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+    Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+        (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+            (rmNode.getState() == NodeState.DECOMMISSIONING));
+    if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+      Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+          metrics.getNumDecommisionedNMs(), 1);
+    }
+    latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+
+    rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+             rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+    Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+        (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+            (rmNode.getState() == NodeState.DECOMMISSIONING));
+    if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+      Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+          metrics.getNumDecommisionedNMs(), 1);
+    }
+
+    //Test decommed/ing node that transitions to untracked,timer should remove
+    writeToHostsFile("host1", ip, "host2");
+    writeToHostsFile(excludeHostFile, "host2");
+    refreshNodesOption(doGraceful, conf);
+    nm1.nodeHeartbeat(true);
+    //nm2.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+    rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+             rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+    Assert.assertNotEquals("Timer for this node was not canceled!",
+        rmNode, null);
+    Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+        (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+            (rmNode.getState() == NodeState.DECOMMISSIONING));
+
+    writeToHostsFile("host1", ip);
+    writeToHostsFile(excludeHostFile, "");
+    refreshNodesOption(doGraceful, conf);
+    latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+    rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+             rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+    Assert.assertEquals("Node should have been forgotten!",
+        rmNode, null);
+    Assert.assertEquals("Shutdown nodes should be 0 now",
+        metrics.getNumDecommisionedNMs(), 0);
+    Assert.assertEquals("Shutdown nodes should be 0 now",
+        metrics.getNumShutdownNMs(), 0);
+    Assert.assertEquals("Active nodes should be 2",
+        metrics.getNumActiveNMs(), 2);
+
+    rm.stop();
+  }
+
+  private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000);
+    int timeoutValue = 500;
+    File excludeHostFile = new File(TEMP_DIR + File.separator +
+        "excludeHostFile.txt");
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+        excludeHostFile.getAbsolutePath());
+    writeToHostsFile(hostFile, "host1", "localhost", "host2");
+    writeToHostsFile(excludeHostFile, "");
+    
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+        timeoutValue);
+
+    rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+    RMContext rmContext = rm.getRMContext();
+    refreshNodesOption(doGraceful, conf);
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 10240);
+    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+    ClusterMetrics metrics = clusterMetrics;
+    assert (metrics != null);
+    rm.drainEvents();
+    //check all 3 nodes joined in as NORMAL
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    nodeHeartbeat = nm2.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    nodeHeartbeat = nm3.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    rm.drainEvents();
+    Assert.assertEquals("All 3 nodes should be active",
+        metrics.getNumActiveNMs(), 3);
+    int waitCount = 0;
+    while(waitCount ++<20){
+      synchronized (this) {
+        wait(200);
+      }
+      nm3.nodeHeartbeat(true);
+      nm1.nodeHeartbeat(true);
+    }
+    Assert.assertNotEquals("host2 should be a lost NM!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+    Assert.assertEquals("host2 should be a lost NM!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
+        NodeState.LOST);
+    Assert.assertEquals("There should be 1 Lost NM!",
+        clusterMetrics.getNumLostNMs(), 1);
+    Assert.assertEquals("There should be 2 Active NM!",
+        clusterMetrics.getNumActiveNMs(), 2);
+    int nodeRemovalTimeout =
+        conf.getInt(
+            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+            YarnConfiguration.
+                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+    int nodeRemovalInterval =
+        rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+    long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+    writeToHostsFile(hostFile, "host1", "localhost");
+    writeToHostsFile(excludeHostFile, "");
+    refreshNodesOption(doGraceful, conf);
+    nm1.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    rm.drainEvents();
+    waitCount = 0;
+    while(rmContext.getInactiveRMNodes().get(
+        nm2.getNodeId()) != null && waitCount++ < 2){
+      synchronized (this) {
+        wait(maxThreadSleeptime);
+        nm1.nodeHeartbeat(true);
+        nm2.nodeHeartbeat(true);
+      }
+    }
+    Assert.assertEquals("host2 should have been forgotten!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+    Assert.assertEquals("There should be no Lost NMs!",
+        clusterMetrics.getNumLostNMs(), 0);
+    Assert.assertEquals("There should be 2 Active NM!",
+        clusterMetrics.getNumActiveNMs(), 2);
+    rm.stop();
+  }
+
+  private void testNodeRemovalUtilRebooted(boolean doGraceful)
+      throws Exception {
+    Configuration conf = new Configuration();
+    int timeoutValue = 500;
+    File excludeHostFile = new File(TEMP_DIR + File.separator +
+        "excludeHostFile.txt");
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+        excludeHostFile.getAbsolutePath());
+    writeToHostsFile(hostFile, "host1", "localhost", "host2");
+    writeToHostsFile(excludeHostFile, "");
+    
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+        timeoutValue);
+
+    rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+    RMContext rmContext = rm.getRMContext();
+    refreshNodesOption(doGraceful, conf);
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 10240);
+    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+    ClusterMetrics metrics = clusterMetrics;
+    assert (metrics != null);
+    NodeHeartbeatResponse nodeHeartbeat = nm2.nodeHeartbeat(
+        new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
+    rm.drainEvents();
+    rm.drainEvents();
+
+    Assert.assertNotEquals("host2 should be a rebooted NM!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+    Assert.assertEquals("host2 should be a rebooted NM!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
+        NodeState.REBOOTED);
+    Assert.assertEquals("There should be 1 Rebooted NM!",
+        clusterMetrics.getNumRebootedNMs(), 1);
+    Assert.assertEquals("There should be 2 Active NM!",
+        clusterMetrics.getNumActiveNMs(), 2);
+
+    int nodeRemovalTimeout =
+        conf.getInt(
+            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+            YarnConfiguration.
+                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+    int nodeRemovalInterval =
+        rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+    long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+    writeToHostsFile(hostFile, "host1", "localhost");
+    writeToHostsFile(excludeHostFile, "");
+    refreshNodesOption(doGraceful, conf);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    rm.drainEvents();
+    int waitCount = 0;
+    while(rmContext.getInactiveRMNodes().get(
+        nm2.getNodeId()) != null && waitCount++ < 2){
+      synchronized (this) {
+        wait(maxThreadSleeptime);
+      }
+    }
+    Assert.assertEquals("host2 should have been forgotten!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+    Assert.assertEquals("There should be no Rebooted NMs!",
+        clusterMetrics.getNumRebootedNMs(), 0);
+    Assert.assertEquals("There should be 2 Active NM!",
+        clusterMetrics.getNumActiveNMs(), 2);
+    rm.stop();
+  }
+
+  private void testNodeRemovalUtilUnhealthy(boolean doGraceful)
+      throws Exception {
+    Configuration conf = new Configuration();
+    int timeoutValue = 500;
+    File excludeHostFile = new File(TEMP_DIR + File.separator +
+        "excludeHostFile.txt");
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+        excludeHostFile.getAbsolutePath());
+    writeToHostsFile(hostFile, "host1", "localhost", "host2");
+    writeToHostsFile(excludeHostFile, "");
+    
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+        timeoutValue);
+
+    rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+    RMContext rmContext = rm.getRMContext();
+    refreshNodesOption(doGraceful, conf);
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 10240);
+    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+    ClusterMetrics metrics = clusterMetrics;
+    assert (metrics != null);
+    rm.drainEvents();
+    //check all 3 nodes joined in as NORMAL
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    nodeHeartbeat = nm2.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    nodeHeartbeat = nm3.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    rm.drainEvents();
+    Assert.assertEquals("All 3 nodes should be active",
+        metrics.getNumActiveNMs(), 3);
+    // node healthy
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(false);
+    nm3.nodeHeartbeat(true);
+    checkUnhealthyNMCount(rm, nm2, true, 1);
+    writeToHostsFile(hostFile, "host1", "localhost");
+    writeToHostsFile(excludeHostFile, "");
+    refreshNodesOption(doGraceful, conf);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(false);
+    nm3.nodeHeartbeat(true);
+    rm.drainEvents();
+    Assert.assertNotEquals("host2 should be a shutdown NM!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+    Assert.assertEquals("host2 should be a shutdown NM!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
+        NodeState.SHUTDOWN);
+    Assert.assertEquals("There should be 2 Active NM!",
+        clusterMetrics.getNumActiveNMs(), 2);
+    Assert.assertEquals("There should be 1 Shutdown NM!",
+        clusterMetrics.getNumShutdownNMs(), 1);
+    Assert.assertEquals("There should be 0 Unhealthy NM!",
+        clusterMetrics.getUnhealthyNMs(), 0);
+    int nodeRemovalTimeout =
+        conf.getInt(
+            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+            YarnConfiguration.
+                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+    int nodeRemovalInterval =
+        rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+    long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+    int waitCount = 0;
+    while(rmContext.getInactiveRMNodes().get(
+        nm2.getNodeId()) != null && waitCount++ < 2){
+      synchronized (this) {
+        wait(maxThreadSleeptime);
+      }
+    }
+    Assert.assertEquals("host2 should have been forgotten!",
+        rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+    Assert.assertEquals("There should be no Shutdown NMs!",
+        clusterMetrics.getNumRebootedNMs(), 0);
+    Assert.assertEquals("There should be 2 Active NM!",
+        clusterMetrics.getNumActiveNMs(), 2);
+    rm.stop();
+  }
+
   private void writeToHostsFile(String... hosts) throws IOException {
    writeToHostsFile(hostFile, hosts);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0da1322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index 94bd253..50d4e04 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -292,8 +292,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase 
{
       RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
       WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
           info.getString("nodeHTTPAddress"));
-      WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
-          .toString(), info.getString("state"));
+      if (rmNode != null) {
+        WebServicesTestUtils.checkStringMatch("state",
+            rmNode.getState().toString(), info.getString("state"));
+      }
     }
   }
   
@@ -319,8 +321,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase 
{
         rm.getRMContext().getInactiveRMNodes().get(rmnode2.getNodeID());
     WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
         info.getString("nodeHTTPAddress"));
-    WebServicesTestUtils.checkStringMatch("state",
-        rmNode.getState().toString(), info.getString("state"));
+    if (rmNode != null) {
+      WebServicesTestUtils.checkStringMatch("state",
+          rmNode.getState().toString(), info.getString("state"));
+    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to