YARN-3980. Plumb resource-utilization info in node heartbeat through to the 
scheduler. (Inigo Goiri via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52948bb2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52948bb2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52948bb2

Branch: refs/heads/yarn-2877
Commit: 52948bb20bd1446164df1d3920c46c96dad750ae
Parents: f80dc6f
Author: Karthik Kambatla <ka...@apache.org>
Authored: Tue Nov 24 10:05:12 2015 +0530
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Tue Nov 24 13:47:17 2015 +0530

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |  11 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |  11 +
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../impl/pb/ResourceUtilizationPBImpl.java      |   2 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |   3 +-
 .../resourcemanager/ResourceTrackerService.java |   5 +-
 .../server/resourcemanager/rmnode/RMNode.java   |  15 +-
 .../resourcemanager/rmnode/RMNodeImpl.java      |  55 +++++
 .../rmnode/RMNodeStatusEvent.java               |  53 ++--
 .../scheduler/SchedulerNode.java                |  38 +++
 .../scheduler/capacity/CapacityScheduler.java   |   5 +
 .../scheduler/fair/FairScheduler.java           |   5 +
 .../scheduler/fifo/FifoScheduler.java           |   4 +
 .../yarn/server/resourcemanager/MockNodes.java  |  28 ++-
 .../resourcemanager/TestRMNodeTransitions.java  |  16 +-
 .../TestRMAppLogAggregationStatus.java          |  32 +--
 .../webapp/TestRMWebServicesNodes.java          |   6 +-
 .../hadoop/yarn/server/MiniYARNCluster.java     |  54 +++-
 .../TestMiniYarnClusterNodeUtilization.java     | 245 +++++++++++++++++++
 19 files changed, 522 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index dae2ce7..f5943a8 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -188,6 +189,16 @@ public class NodeInfo {
       // TODO Auto-generated method stub
       return null;
     }
+
+    @Override
+    public ResourceUtilization getAggregatedContainersUtilization() {
+      return null;
+    }
+
+    @Override
+    public ResourceUtilization getNodeUtilization() {
+      return null;
+    }
   }
 
   public static RMNode newNodeInfo(String rackName, String hostName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 8c65ccc..e778188 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -176,4 +177,14 @@ public class RMNodeWrapper implements RMNode {
     // TODO Auto-generated method stub
     return null;
   }
+
+  @Override
+  public ResourceUtilization getAggregatedContainersUtilization() {
+    return node.getAggregatedContainersUtilization();
+  }
+
+  @Override
+  public ResourceUtilization getNodeUtilization() {
+    return node.getNodeUtilization();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a76c835..0532e1d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -568,6 +568,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3454. Add efficient merge operation to RLESparseResourceAllocation
     (Carlo Curino via asuresh)
 
+    YARN-3980. Plumb resource-utilization info in node heartbeat through to 
the 
+    scheduler. (Inigo Goiri via kasha)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java
index 01cda7a..29f79d4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java
@@ -75,7 +75,7 @@ public class ResourceUtilizationPBImpl extends 
ResourceUtilization {
   @Override
   public void setVirtualMemory(int vmem) {
     maybeInitBuilder();
-    builder.setPmem(vmem);
+    builder.setVmem(vmem);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 3f8cf32..34267b3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -417,7 +417,8 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     return appList;
   }
 
-  private NodeStatus getNodeStatus(int responseId) throws IOException {
+  @VisibleForTesting
+  protected NodeStatus getNodeStatus(int responseId) throws IOException {
 
     NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
     nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 3638a19..bd24b25 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -461,10 +461,7 @@ public class ResourceTrackerService extends 
AbstractService implements
 
     // 4. Send status to RMNode, saving the latest response.
     RMNodeStatusEvent nodeStatusEvent =
-        new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
-            remoteNodeStatus.getContainersStatuses(),
-            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse,
-            remoteNodeStatus.getIncreasedContainers());
+        new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse);
     if (request.getLogAggregationReportsForApps() != null
         && !request.getLogAggregationReportsForApps().isEmpty()) {
       nodeStatusEvent.setLogAggregationReportsForApps(request

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index f28422a..1a172e8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
 
 /**
  * Node managers information on available resources 
@@ -98,7 +99,19 @@ public interface RMNode {
    * @return the total available resource.
    */
   public Resource getTotalCapability();
-  
+
+  /**
+   * the aggregated resource utilization of the containers.
+   * @return the aggregated resource utilization of the containers.
+   */
+  public ResourceUtilization getAggregatedContainersUtilization();
+
+  /**
+   * the total resource utilization of the node.
+   * @return the total resource utilization of the node.
+   */
+  public ResourceUtilization getNodeUtilization();
+
   /**
    * The rack name for this node manager.
    * @return the rack name.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index e0d27d6..146b0d3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -57,6 +57,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
@@ -114,6 +115,11 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   private long lastHealthReportTime;
   private String nodeManagerVersion;
 
+  /* Aggregated resource utilization for the containers. */
+  private ResourceUtilization containersUtilization;
+  /* Resource utilization for the node. */
+  private ResourceUtilization nodeUtilization;
+
   private final ContainerAllocationExpirer containerAllocationExpirer;
   /* set of containers that have just launched */
   private final Set<ContainerId> launchedContainers =
@@ -446,6 +452,49 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   }
 
   @Override
+  public ResourceUtilization getAggregatedContainersUtilization() {
+    this.readLock.lock();
+
+    try {
+      return this.containersUtilization;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void setAggregatedContainersUtilization(
+      ResourceUtilization containersUtilization) {
+    this.writeLock.lock();
+
+    try {
+      this.containersUtilization = containersUtilization;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Override
+  public ResourceUtilization getNodeUtilization() {
+    this.readLock.lock();
+
+    try {
+      return this.nodeUtilization;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void setNodeUtilization(ResourceUtilization nodeUtilization) {
+    this.writeLock.lock();
+
+    try {
+      this.nodeUtilization = nodeUtilization;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Override
   public NodeState getState() {
     this.readLock.lock();
 
@@ -1006,6 +1055,9 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
       rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
       rmNode.setLastHealthReportTime(
           remoteNodeHealthStatus.getLastHealthReportTime());
+      rmNode.setAggregatedContainersUtilization(
+          statusEvent.getAggregatedContainersUtilization());
+      rmNode.setNodeUtilization(statusEvent.getNodeUtilization());
       NodeState initialState = rmNode.getState();
       boolean isNodeDecommissioning =
           initialState.equals(NodeState.DECOMMISSIONING);
@@ -1083,6 +1135,9 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
       rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
       rmNode.setLastHealthReportTime(
           remoteNodeHealthStatus.getLastHealthReportTime());
+      rmNode.setAggregatedContainersUtilization(
+          statusEvent.getAggregatedContainersUtilization());
+      rmNode.setNodeUtilization(statusEvent.getNodeUtilization());
       if (remoteNodeHealthStatus.getIsNodeHealthy()) {
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeAddedSchedulerEvent(rmNode));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index 0dbea1a..afc417d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -28,52 +28,35 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
 
 public class RMNodeStatusEvent extends RMNodeEvent {
 
-  private final NodeHealthStatus nodeHealthStatus;
-  private final List<ContainerStatus> containersCollection;
+  private final NodeStatus nodeStatus;
   private final NodeHeartbeatResponse latestResponse;
-  private final List<ApplicationId> keepAliveAppIds;
   private List<LogAggregationReport> logAggregationReportsForApps;
-  private final List<Container> nmReportedIncreasedContainers;
-  
-  // Used by tests
-  public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
-      List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
-      NodeHeartbeatResponse latestResponse) {
-    this(nodeId, nodeHealthStatus, collection, keepAliveAppIds,
-        latestResponse, null);
-  }
 
-  public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
-      List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
-      NodeHeartbeatResponse latestResponse,
-      List<Container> nmReportedIncreasedContainers) {
-    this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, latestResponse,
-        null, nmReportedIncreasedContainers);
+  public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
+      NodeHeartbeatResponse latestResponse) {
+    this(nodeId, nodeStatus, latestResponse, null);
   }
 
-  public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
-      List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
+  public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
       NodeHeartbeatResponse latestResponse,
-      List<LogAggregationReport> logAggregationReportsForApps,
-      List<Container> nmReportedIncreasedContainers) {
+      List<LogAggregationReport> logAggregationReportsForApps) {
     super(nodeId, RMNodeEventType.STATUS_UPDATE);
-    this.nodeHealthStatus = nodeHealthStatus;
-    this.containersCollection = collection;
-    this.keepAliveAppIds = keepAliveAppIds;
+    this.nodeStatus = nodeStatus;
     this.latestResponse = latestResponse;
     this.logAggregationReportsForApps = logAggregationReportsForApps;
-    this.nmReportedIncreasedContainers = nmReportedIncreasedContainers;
   }
 
   public NodeHealthStatus getNodeHealthStatus() {
-    return this.nodeHealthStatus;
+    return this.nodeStatus.getNodeHealthStatus();
   }
 
   public List<ContainerStatus> getContainers() {
-    return this.containersCollection;
+    return this.nodeStatus.getContainersStatuses();
   }
 
   public NodeHeartbeatResponse getLatestResponse() {
@@ -81,7 +64,15 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   }
   
   public List<ApplicationId> getKeepAliveAppIds() {
-    return this.keepAliveAppIds;
+    return this.nodeStatus.getKeepAliveApplications();
+  }
+
+  public ResourceUtilization getAggregatedContainersUtilization() {
+    return this.nodeStatus.getContainersUtilization();
+  }
+
+  public ResourceUtilization getNodeUtilization() {
+    return this.nodeStatus.getNodeUtilization();
   }
 
   public List<LogAggregationReport> getLogAggregationReportsForApps() {
@@ -95,7 +86,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   
   @SuppressWarnings("unchecked")
   public List<Container> getNMReportedIncreasedContainers() {
-    return nmReportedIncreasedContainers == null ? Collections.EMPTY_LIST
-        : nmReportedIncreasedContainers;
+    return this.nodeStatus.getIncreasedContainers() == null ?
+        Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index f3d3906..e8e1238 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -58,6 +59,10 @@ public abstract class SchedulerNode {
   private Resource totalResourceCapability;
   private RMContainer reservedContainer;
   private volatile int numContainers;
+  private volatile ResourceUtilization containersUtilization =
+      ResourceUtilization.newInstance(0, 0, 0f);
+  private volatile ResourceUtilization nodeUtilization =
+      ResourceUtilization.newInstance(0, 0, 0f);
 
 
   /* set of containers that are allocated containers */
@@ -339,4 +344,37 @@ public abstract class SchedulerNode {
       return this.labels.iterator().next();
     }
   }
+
+  /**
+   * Set the resource utilization of the containers in the node.
+   * @param containersUtilization Resource utilization of the containers.
+   */
+  public void setAggregatedContainersUtilization(
+      ResourceUtilization containersUtilization) {
+    this.containersUtilization = containersUtilization;
+  }
+
+  /**
+   * Get the resource utilization of the containers in the node.
+   * @return Resource utilization of the containers.
+   */
+  public ResourceUtilization getAggregatedContainersUtilization() {
+    return this.containersUtilization;
+  }
+
+  /**
+   * Set the resource utilization of the node. This includes the containers.
+   * @param nodeUtilization Resource utilization of the node.
+   */
+  public void setNodeUtilization(ResourceUtilization nodeUtilization) {
+    this.nodeUtilization = nodeUtilization;
+  }
+
+  /**
+   * Get the resource utilization of the node.
+   * @return Resource utilization of the node.
+   */
+  public ResourceUtilization getNodeUtilization() {
+    return this.nodeUtilization;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e14aecf..782ed03 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1063,6 +1063,11 @@ public class CapacityScheduler extends
       releaseResources);
     schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
 
+    // Updating node resource utilization
+    node.setAggregatedContainersUtilization(
+        nm.getAggregatedContainersUtilization());
+    node.setNodeUtilization(nm.getNodeUtilization());
+
     // Now node data structures are upto date and ready for scheduling.
     if(LOG.isDebugEnabled()) {
       LOG.debug("Node being looked for scheduling " + nm

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index f26e506..f1839f5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1069,6 +1069,11 @@ public class FairScheduler extends
       attemptScheduling(node);
     }
 
+    // Updating node resource utilization
+    node.setAggregatedContainersUtilization(
+        nm.getAggregatedContainersUtilization());
+    node.setNodeUtilization(nm.getNodeUtilization());
+
     long duration = getClock().getTime() - start;
     fsOpDurations.addNodeUpdateDuration(duration);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 5999eb7..cfae3a2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -743,6 +743,10 @@ public class FifoScheduler extends
           completedContainer, RMContainerEventType.FINISHED);
     }
 
+    // Updating node resource utilization
+    node.setAggregatedContainersUtilization(
+        rmNode.getAggregatedContainersUtilization());
+    node.setNodeUtilization(rmNode.getNodeUtilization());
 
     if (rmContext.isWorkPreservingRecoveryEnabled()
         && !rmContext.isSchedulerReadyForAllocatingContainers()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 92f3edf..96207f3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -110,11 +111,14 @@ public class MockNodes {
     private long lastHealthReportTime;
     private NodeState state;
     private Set<String> labels;
+    private ResourceUtilization containersUtilization;
+    private ResourceUtilization nodeUtilization;
 
     public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
         Resource perNode, String rackName, String healthReport,
         long lastHealthReportTime, int cmdPort, String hostName, NodeState 
state,
-        Set<String> labels) {
+        Set<String> labels, ResourceUtilization containersUtilization,
+        ResourceUtilization nodeUtilization) {
       this.nodeId = nodeId;
       this.nodeAddr = nodeAddr;
       this.httpAddress = httpAddress;
@@ -126,6 +130,8 @@ public class MockNodes {
       this.hostName = hostName;
       this.state = state;
       this.labels = labels;
+      this.containersUtilization = containersUtilization;
+      this.nodeUtilization = nodeUtilization;
     }
 
     @Override
@@ -244,6 +250,16 @@ public class MockNodes {
     public List<Container> pullNewlyIncreasedContainers() {
       return Collections.emptyList();
     }
+
+    @Override
+    public ResourceUtilization getAggregatedContainersUtilization() {
+      return this.containersUtilization;
+    }
+
+    @Override
+    public ResourceUtilization getNodeUtilization() {
+      return this.nodeUtilization;
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode,
@@ -254,18 +270,19 @@ public class MockNodes {
   private static RMNode buildRMNode(int rack, final Resource perNode,
       NodeState state, String httpAddr, Set<String> labels) {
     return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123,
-        labels);
+        labels, null, null);
   }
   
   private static RMNode buildRMNode(int rack, final Resource perNode,
       NodeState state, String httpAddr, int hostnum, String hostName, int 
port) {
     return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port,
-        null);
+        null, null, null);
   }
 
   private static RMNode buildRMNode(int rack, final Resource perNode,
       NodeState state, String httpAddr, int hostnum, String hostName, int port,
-      Set<String> labels) {
+      Set<String> labels, ResourceUtilization containersUtilization,
+      ResourceUtilization nodeUtilization) {
     final String rackName = "rack"+ rack;
     final int nid = hostnum;
     final String nodeAddr = hostName + ":" + nid;
@@ -277,7 +294,8 @@ public class MockNodes {
     final String httpAddress = httpAddr;
     String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
     return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
-        rackName, healthReport, 0, nid, hostName, state, labels);
+        rackName, healthReport, 0, nid, hostName, state, labels,
+        containersUtilization, nodeUtilization);
   }
 
   public static RMNode nodeInfo(int rack, final Resource perNode,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index a6e1575..33a077d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
@@ -647,8 +648,9 @@ public class TestRMNodeTransitions {
     statusList.add(status);
     NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
         "", System.currentTimeMillis());
-    node.handle(new RMNodeStatusEvent(nodeId, nodeHealth,
-        statusList, null, null));
+    NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, 0, statusList, null,
+        nodeHealth, null, null, null);
+    node.handle(new RMNodeStatusEvent(nodeId, nodeStatus, null));
 
     Assert.assertEquals(1, node.getRunningApps().size());
 
@@ -689,8 +691,9 @@ public class TestRMNodeTransitions {
     RMNodeImpl node = getRunningNode();
     NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
         System.currentTimeMillis());
-    node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
-        new ArrayList<ContainerStatus>(), null, null));
+    NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
+      new ArrayList<ContainerStatus>(), null, status, null, null, null);
+    node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
     Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
     return node;
   }
@@ -863,8 +866,9 @@ public class TestRMNodeTransitions {
     RMNodeImpl node = getDecommissioningNode();
     NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
         System.currentTimeMillis());
-    node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
-        new ArrayList<ContainerStatus>(), null, null));
+    NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
+        new ArrayList<ContainerStatus>(), null, status, null, null, null);
+    node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
     Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 1f307aa..087199d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@@ -163,9 +164,11 @@ public class TestRMAppLogAggregationStatus {
         LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
           messageForNode1_1);
     node1ReportForApp.add(report1);
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
-      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node1ReportForApp, null));
+    NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0,
+        new ArrayList<ContainerStatus>(), null,
+        NodeHealthStatus.newInstance(true, null, 0), null, null, null);
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+        node1ReportForApp));
 
     List<LogAggregationReport> node2ReportForApp =
         new ArrayList<LogAggregationReport>();
@@ -175,9 +178,11 @@ public class TestRMAppLogAggregationStatus {
         LogAggregationReport.newInstance(appId,
           LogAggregationStatus.RUNNING, messageForNode2_1);
     node2ReportForApp.add(report2);
-    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
-      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node2ReportForApp, null));
+    NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0,
+        new ArrayList<ContainerStatus>(), null,
+        NodeHealthStatus.newInstance(true, null, 0), null, null, null);
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
+        node2ReportForApp));
     // node1 and node2 has updated its log aggregation status
     // verify that the log aggregation status for node1, node2
     // has been changed
@@ -213,9 +218,8 @@ public class TestRMAppLogAggregationStatus {
         LogAggregationReport.newInstance(appId,
           LogAggregationStatus.RUNNING, messageForNode1_2);
     node1ReportForApp2.add(report1_2);
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
-      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node1ReportForApp2, null));
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+        node1ReportForApp2));
 
     // verify that the log aggregation status for node1
     // has been changed
@@ -282,9 +286,8 @@ public class TestRMAppLogAggregationStatus {
       LogAggregationStatus.SUCCEEDED, ""));
     // For every logAggregationReport cached in memory, we can only save at 
most
     // 10 diagnostic messages/failure messages
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
-      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node1ReportForApp3, null));
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+        node1ReportForApp3));
 
     logAggregationStatus = rmApp.getLogAggregationReportsForApp();
     Assert.assertEquals(2, logAggregationStatus.size());
@@ -327,9 +330,8 @@ public class TestRMAppLogAggregationStatus {
           LogAggregationStatus.FAILED, "");
     node2ReportForApp2.add(report2_2);
     node2ReportForApp2.add(report2_3);
-    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
-      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node2ReportForApp2, null));
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
+        node2ReportForApp2));
     Assert.assertEquals(LogAggregationStatus.FAILED,
       rmApp.getLogAggregationStatusForAppReport());
     logAggregationStatus = rmApp.getLogAggregationReportsForApp();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index ec20bba..206edb1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -142,8 +143,9 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
         .get(nm3.getNodeId());
     NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false,
         "test health report", System.currentTimeMillis());
-    node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth,
-        new ArrayList<ContainerStatus>(), null, null));
+    NodeStatus nodeStatus = NodeStatus.newInstance(nm3.getNodeId(), 1,
+      new ArrayList<ContainerStatus>(), null, nodeHealth, null, null, null);
+    node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeStatus, null));
     rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY);
 
     ClientResponse response =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index aa796ed..68c9efd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -57,6 +57,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
@@ -594,19 +595,66 @@ public class MiniYARNCluster extends CompositeService {
     }
   }
 
-  private class CustomNodeManager extends NodeManager {
+  public class CustomNodeManager extends NodeManager {
+    protected NodeStatus nodeStatus;
+
+    public void setNodeStatus(NodeStatus status) {
+      this.nodeStatus = status;
+    }
+
+    /**
+     * Hook to allow modification/replacement of NodeStatus
+     * @param currentStatus Current status.
+     * @return New node status.
+     */
+    protected NodeStatus getSimulatedNodeStatus(NodeStatus currentStatus) {
+      if(nodeStatus == null) {
+        return currentStatus;
+      } else {
+        // Increment response ID, the RMNodeStatusEvent will not get recorded
+        // for a duplicate heartbeat
+        nodeStatus.setResponseId(nodeStatus.getResponseId() + 1);
+        return nodeStatus;
+      }
+    }
+
     @Override
     protected void doSecureLogin() throws IOException {
       // Don't try to login using keytab in the testcase.
     }
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      return new NodeStatusUpdaterImpl(context,
+          dispatcher,
+          healthChecker,
+          metrics) {
+
+        // Allow simulation of nodestatus
+        @Override
+        protected NodeStatus getNodeStatus(int responseId) throws IOException {
+          return getSimulatedNodeStatus(super.getNodeStatus(responseId));
+        }
+      };
+    }
   }
 
   private class ShortCircuitedNodeManager extends CustomNodeManager {
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
-      return new NodeStatusUpdaterImpl(context, dispatcher,
-          healthChecker, metrics) {
+      return new NodeStatusUpdaterImpl(context,
+          dispatcher,
+          healthChecker,
+          metrics) {
+
+        // Allow simulation of nodestatus
+        @Override
+        protected NodeStatus getNodeStatus(int responseId) throws IOException {
+          return getSimulatedNodeStatus(super.getNodeStatus(responseId));
+        }
+
         @Override
         protected ResourceTracker getRMClient() {
           final ResourceTrackerService rt =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java
new file mode 100644
index 0000000..77f6492
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster.CustomNodeManager;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMiniYarnClusterNodeUtilization {
+  // Mini YARN cluster setup
+  private static final int NUM_RM = 1;
+  private static final int NUM_NM = 1;
+
+  // Values for the first round
+  private static final int CONTAINER_PMEM_1 = 1024;
+  private static final int CONTAINER_VMEM_1 = 2048;
+  private static final float CONTAINER_CPU_1 = 11.0f;
+
+  private static final int NODE_PMEM_1 = 10240;
+  private static final int NODE_VMEM_1 = 20480;
+  private static final float NODE_CPU_1 = 51.0f;
+
+  // Values for the second round
+  private static final int CONTAINER_PMEM_2 = 2048;
+  private static final int CONTAINER_VMEM_2 = 4096;
+  private static final float CONTAINER_CPU_2 = 22.0f;
+
+  private static final int NODE_PMEM_2 = 20480;
+  private static final int NODE_VMEM_2 = 40960;
+  private static final float NODE_CPU_2 = 61.0f;
+
+  private MiniYARNCluster cluster;
+  private CustomNodeManager nm;
+
+  private Configuration conf;
+
+  private NodeStatus nodeStatus;
+
+  @Before
+  public void setup() {
+    conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+    String name = TestMiniYarnClusterNodeUtilization.class.getName();
+    cluster = new MiniYARNCluster(name, NUM_RM, NUM_NM, 1, 1);
+    cluster.init(conf);
+    cluster.start();
+    assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+
+    nm = (CustomNodeManager)cluster.getNodeManager(0);
+    int responseId = 1;
+    nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId,
+        CONTAINER_PMEM_1, CONTAINER_VMEM_1, CONTAINER_CPU_1,
+        NODE_PMEM_1, NODE_VMEM_1, NODE_CPU_1);
+    nm.setNodeStatus(nodeStatus);
+  }
+
+  /**
+   * Simulates a NM heartbeat using the simulated NodeStatus fixture. Verify
+   * both the RMNode and SchedulerNode have been updated with the new
+   * utilization.
+   */
+  @Test(timeout=60000)
+  public void testUpdateNodeUtilization()
+      throws InterruptedException, IOException, YarnException {
+    assertTrue("NMs fail to connect to the RM",
+        cluster.waitForNodeManagersToConnect(10000));
+
+    // Simulate heartbeat using NodeStatus fixture
+    NodeHeartbeatRequest request =
+        NodeHeartbeatRequest.newInstance(nodeStatus, null, null, null);
+    ResourceTracker tracker =
+        ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
+    tracker.nodeHeartbeat(request);
+
+    // Give the heartbeat time to propagate to the RM
+    verifySimulatedUtilization();
+
+    // Alter utilization
+    int responseId = 10;
+    nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId,
+        CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2,
+        NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2);
+    nm.setNodeStatus(nodeStatus);
+    tracker.nodeHeartbeat(request);
+
+    // Give the heartbeat time to propagate to the RM
+    verifySimulatedUtilization();
+  }
+
+  /**
+   * Trigger the NM to send a heartbeat using the simulated NodeStatus fixture.
+   * Verify both the RMNode and SchedulerNode have been updated with the new
+   * utilization.
+   */
+  @Test(timeout=60000)
+  public void testMockNodeStatusHeartbeat()
+      throws InterruptedException, YarnException {
+    assertTrue("NMs fail to connect to the RM",
+        cluster.waitForNodeManagersToConnect(10000));
+
+    NodeStatusUpdater updater = nm.getNodeStatusUpdater();
+    updater.sendOutofBandHeartBeat();
+
+    // Give the heartbeat time to propagate to the RM
+    verifySimulatedUtilization();
+
+    // Alter utilization
+    int responseId = 20;
+    nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId,
+        CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2,
+        NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2);
+    nm.setNodeStatus(nodeStatus);
+    updater.sendOutofBandHeartBeat();
+
+    verifySimulatedUtilization();
+  }
+
+  /**
+   * Create a NodeStatus test vector.
+   * @param nodeId Node identifier.
+   * @param responseId Response identifier.
+   * @param containerPMem Virtual memory of the container.
+   * @param containerVMem Physical memory of the container.
+   * @param containerCPU CPU percentage of the container.
+   * @param nodePMem Physical memory of the node.
+   * @param nodeVMem Virtual memory of the node.
+   * @param nodeCPU CPU percentage of the node.
+   */
+  private NodeStatus createNodeStatus(
+      NodeId nodeId,
+      int responseId,
+      int containerPMem,
+      int containerVMem,
+      float containerCPU,
+      int nodePMem,
+      int nodeVMem,
+      float nodeCPU) {
+
+    // Fake node status with fake utilization
+    ResourceUtilization containersUtilization =
+        ResourceUtilization.newInstance(containerPMem, containerVMem,
+            containerCPU);
+    ResourceUtilization nodeUtilization =
+        ResourceUtilization.newInstance(nodePMem, nodeVMem, nodeCPU);
+    NodeStatus status = NodeStatus.newInstance(
+        nodeId,
+        responseId,
+        new ArrayList<ContainerStatus>(),
+        null,
+        NodeHealthStatus.newInstance(true, null, 0),
+        containersUtilization,
+        nodeUtilization,
+        null);
+
+    return status;
+  }
+
+  /**
+   * Verify both the RMNode and SchedulerNode have been updated with the test
+   * fixture utilization data.
+   * @param containersUtilization Utilization of the container.
+   * @param nodeUtilization Utilization of the node.
+   */
+  private void verifySimulatedUtilization() throws InterruptedException {
+    ResourceManager rm = cluster.getResourceManager(0);
+    RMContext rmContext = rm.getRMContext();
+
+    ResourceUtilization containersUtilization =
+        nodeStatus.getContainersUtilization();
+    ResourceUtilization nodeUtilization =
+        nodeStatus.getNodeUtilization();
+
+    // Give the heartbeat time to propagate to the RM (max 10 seconds)
+    // We check if the nodeUtilization is up to date
+    for (int i=0; i<100; i++) {
+      for (RMNode ni : rmContext.getRMNodes().values()) {
+        if (ni.getNodeUtilization().equals(nodeUtilization)) {
+          break;
+        }
+      }
+      Thread.sleep(100);
+    }
+
+    // Verify the data is readable from the RM and scheduler nodes
+    for (RMNode ni : rmContext.getRMNodes().values()) {
+      ResourceUtilization cu = ni.getAggregatedContainersUtilization();
+      assertEquals("Containers Utillization not propagated to RMNode",
+          containersUtilization, cu);
+
+      ResourceUtilization nu = ni.getNodeUtilization();
+      assertEquals("Node Utillization not propagated to RMNode",
+          nodeUtilization, nu);
+
+      SchedulerNode scheduler =
+          rmContext.getScheduler().getSchedulerNode(ni.getNodeID());
+      cu = scheduler.getAggregatedContainersUtilization();
+      assertEquals("Containers Utillization not propagated to SchedulerNode",
+          containersUtilization, cu);
+
+      nu = scheduler.getNodeUtilization();
+      assertEquals("Node Utillization not propagated to SchedulerNode",
+          nodeUtilization, nu);
+    }
+  }
+}

Reply via email to