YARN-3980. Plumb resource-utilization info in node heartbeat through to the scheduler. (Inigo Goiri via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52948bb2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52948bb2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52948bb2 Branch: refs/heads/yarn-2877 Commit: 52948bb20bd1446164df1d3920c46c96dad750ae Parents: f80dc6f Author: Karthik Kambatla <ka...@apache.org> Authored: Tue Nov 24 10:05:12 2015 +0530 Committer: Karthik Kambatla <ka...@apache.org> Committed: Tue Nov 24 13:47:17 2015 +0530 ---------------------------------------------------------------------- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 11 + .../yarn/sls/scheduler/RMNodeWrapper.java | 11 + hadoop-yarn-project/CHANGES.txt | 3 + .../impl/pb/ResourceUtilizationPBImpl.java | 2 +- .../nodemanager/NodeStatusUpdaterImpl.java | 3 +- .../resourcemanager/ResourceTrackerService.java | 5 +- .../server/resourcemanager/rmnode/RMNode.java | 15 +- .../resourcemanager/rmnode/RMNodeImpl.java | 55 +++++ .../rmnode/RMNodeStatusEvent.java | 53 ++-- .../scheduler/SchedulerNode.java | 38 +++ .../scheduler/capacity/CapacityScheduler.java | 5 + .../scheduler/fair/FairScheduler.java | 5 + .../scheduler/fifo/FifoScheduler.java | 4 + .../yarn/server/resourcemanager/MockNodes.java | 28 ++- .../resourcemanager/TestRMNodeTransitions.java | 16 +- .../TestRMAppLogAggregationStatus.java | 32 +-- .../webapp/TestRMWebServicesNodes.java | 6 +- .../hadoop/yarn/server/MiniYARNCluster.java | 54 +++- .../TestMiniYarnClusterNodeUtilization.java | 245 +++++++++++++++++++ 19 files changed, 522 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index dae2ce7..f5943a8 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -188,6 +189,16 @@ public class NodeInfo { // TODO Auto-generated method stub return null; } + + @Override + public ResourceUtilization getAggregatedContainersUtilization() { + return null; + } + + @Override + public ResourceUtilization getNodeUtilization() { + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 8c65ccc..e778188 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -176,4 +177,14 @@ public class RMNodeWrapper implements RMNode { // TODO Auto-generated method stub return null; } + + @Override + public ResourceUtilization getAggregatedContainersUtilization() { + return node.getAggregatedContainersUtilization(); + } + + @Override + public ResourceUtilization getNodeUtilization() { + return node.getNodeUtilization(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a76c835..0532e1d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -568,6 +568,9 @@ Release 2.8.0 - UNRELEASED YARN-3454. Add efficient merge operation to RLESparseResourceAllocation (Carlo Curino via asuresh) + YARN-3980. Plumb resource-utilization info in node heartbeat through to the + scheduler. (Inigo Goiri via kasha) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java index 01cda7a..29f79d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java @@ -75,7 +75,7 @@ public class ResourceUtilizationPBImpl extends ResourceUtilization { @Override public void setVirtualMemory(int vmem) { maybeInitBuilder(); - builder.setPmem(vmem); + builder.setVmem(vmem); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3f8cf32..34267b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -417,7 +417,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return appList; } - private NodeStatus getNodeStatus(int responseId) throws IOException { + @VisibleForTesting + protected NodeStatus getNodeStatus(int responseId) throws IOException { NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3638a19..bd24b25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -461,10 +461,7 @@ public class ResourceTrackerService extends AbstractService implements // 4. Send status to RMNode, saving the latest response. RMNodeStatusEvent nodeStatusEvent = - new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), - remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse, - remoteNodeStatus.getIncreasedContainers()); + new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { nodeStatusEvent.setLogAggregationReportsForApps(request http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index f28422a..1a172e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; /** * Node managers information on available resources @@ -98,7 +99,19 @@ public interface RMNode { * @return the total available resource. */ public Resource getTotalCapability(); - + + /** + * the aggregated resource utilization of the containers. + * @return the aggregated resource utilization of the containers. + */ + public ResourceUtilization getAggregatedContainersUtilization(); + + /** + * the total resource utilization of the node. + * @return the total resource utilization of the node. + */ + public ResourceUtilization getNodeUtilization(); + /** * The rack name for this node manager. * @return the rack name. http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e0d27d6..146b0d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; @@ -114,6 +115,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { private long lastHealthReportTime; private String nodeManagerVersion; + /* Aggregated resource utilization for the containers. */ + private ResourceUtilization containersUtilization; + /* Resource utilization for the node. */ + private ResourceUtilization nodeUtilization; + private final ContainerAllocationExpirer containerAllocationExpirer; /* set of containers that have just launched */ private final Set<ContainerId> launchedContainers = @@ -446,6 +452,49 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } @Override + public ResourceUtilization getAggregatedContainersUtilization() { + this.readLock.lock(); + + try { + return this.containersUtilization; + } finally { + this.readLock.unlock(); + } + } + + public void setAggregatedContainersUtilization( + ResourceUtilization containersUtilization) { + this.writeLock.lock(); + + try { + this.containersUtilization = containersUtilization; + } finally { + this.writeLock.unlock(); + } + } + + @Override + public ResourceUtilization getNodeUtilization() { + this.readLock.lock(); + + try { + return this.nodeUtilization; + } finally { + this.readLock.unlock(); + } + } + + public void setNodeUtilization(ResourceUtilization nodeUtilization) { + this.writeLock.lock(); + + try { + this.nodeUtilization = nodeUtilization; + } finally { + this.writeLock.unlock(); + } + } + + @Override public NodeState getState() { this.readLock.lock(); @@ -1006,6 +1055,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setLastHealthReportTime( remoteNodeHealthStatus.getLastHealthReportTime()); + rmNode.setAggregatedContainersUtilization( + statusEvent.getAggregatedContainersUtilization()); + rmNode.setNodeUtilization(statusEvent.getNodeUtilization()); NodeState initialState = rmNode.getState(); boolean isNodeDecommissioning = initialState.equals(NodeState.DECOMMISSIONING); @@ -1083,6 +1135,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setLastHealthReportTime( remoteNodeHealthStatus.getLastHealthReportTime()); + rmNode.setAggregatedContainersUtilization( + statusEvent.getAggregatedContainersUtilization()); + rmNode.setNodeUtilization(statusEvent.getNodeUtilization()); if (remoteNodeHealthStatus.getIsNodeHealthy()) { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 0dbea1a..afc417d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -28,52 +28,35 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; public class RMNodeStatusEvent extends RMNodeEvent { - private final NodeHealthStatus nodeHealthStatus; - private final List<ContainerStatus> containersCollection; + private final NodeStatus nodeStatus; private final NodeHeartbeatResponse latestResponse; - private final List<ApplicationId> keepAliveAppIds; private List<LogAggregationReport> logAggregationReportsForApps; - private final List<Container> nmReportedIncreasedContainers; - - // Used by tests - public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, - List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, - NodeHeartbeatResponse latestResponse) { - this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, - latestResponse, null); - } - public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, - List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, - NodeHeartbeatResponse latestResponse, - List<Container> nmReportedIncreasedContainers) { - this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, latestResponse, - null, nmReportedIncreasedContainers); + public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, + NodeHeartbeatResponse latestResponse) { + this(nodeId, nodeStatus, latestResponse, null); } - public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, - List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, + public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, NodeHeartbeatResponse latestResponse, - List<LogAggregationReport> logAggregationReportsForApps, - List<Container> nmReportedIncreasedContainers) { + List<LogAggregationReport> logAggregationReportsForApps) { super(nodeId, RMNodeEventType.STATUS_UPDATE); - this.nodeHealthStatus = nodeHealthStatus; - this.containersCollection = collection; - this.keepAliveAppIds = keepAliveAppIds; + this.nodeStatus = nodeStatus; this.latestResponse = latestResponse; this.logAggregationReportsForApps = logAggregationReportsForApps; - this.nmReportedIncreasedContainers = nmReportedIncreasedContainers; } public NodeHealthStatus getNodeHealthStatus() { - return this.nodeHealthStatus; + return this.nodeStatus.getNodeHealthStatus(); } public List<ContainerStatus> getContainers() { - return this.containersCollection; + return this.nodeStatus.getContainersStatuses(); } public NodeHeartbeatResponse getLatestResponse() { @@ -81,7 +64,15 @@ public class RMNodeStatusEvent extends RMNodeEvent { } public List<ApplicationId> getKeepAliveAppIds() { - return this.keepAliveAppIds; + return this.nodeStatus.getKeepAliveApplications(); + } + + public ResourceUtilization getAggregatedContainersUtilization() { + return this.nodeStatus.getContainersUtilization(); + } + + public ResourceUtilization getNodeUtilization() { + return this.nodeStatus.getNodeUtilization(); } public List<LogAggregationReport> getLogAggregationReportsForApps() { @@ -95,7 +86,7 @@ public class RMNodeStatusEvent extends RMNodeEvent { @SuppressWarnings("unchecked") public List<Container> getNMReportedIncreasedContainers() { - return nmReportedIncreasedContainers == null ? Collections.EMPTY_LIST - : nmReportedIncreasedContainers; + return this.nodeStatus.getIncreasedContainers() == null ? + Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index f3d3906..e8e1238 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -58,6 +59,10 @@ public abstract class SchedulerNode { private Resource totalResourceCapability; private RMContainer reservedContainer; private volatile int numContainers; + private volatile ResourceUtilization containersUtilization = + ResourceUtilization.newInstance(0, 0, 0f); + private volatile ResourceUtilization nodeUtilization = + ResourceUtilization.newInstance(0, 0, 0f); /* set of containers that are allocated containers */ @@ -339,4 +344,37 @@ public abstract class SchedulerNode { return this.labels.iterator().next(); } } + + /** + * Set the resource utilization of the containers in the node. + * @param containersUtilization Resource utilization of the containers. + */ + public void setAggregatedContainersUtilization( + ResourceUtilization containersUtilization) { + this.containersUtilization = containersUtilization; + } + + /** + * Get the resource utilization of the containers in the node. + * @return Resource utilization of the containers. + */ + public ResourceUtilization getAggregatedContainersUtilization() { + return this.containersUtilization; + } + + /** + * Set the resource utilization of the node. This includes the containers. + * @param nodeUtilization Resource utilization of the node. + */ + public void setNodeUtilization(ResourceUtilization nodeUtilization) { + this.nodeUtilization = nodeUtilization; + } + + /** + * Get the resource utilization of the node. + * @return Resource utilization of the node. + */ + public ResourceUtilization getNodeUtilization() { + return this.nodeUtilization; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e14aecf..782ed03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1063,6 +1063,11 @@ public class CapacityScheduler extends releaseResources); schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); + // Now node data structures are upto date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug("Node being looked for scheduling " + nm http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f26e506..f1839f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1069,6 +1069,11 @@ public class FairScheduler extends attemptScheduling(node); } + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); + long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 5999eb7..cfae3a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -743,6 +743,10 @@ public class FifoScheduler extends completedContainer, RMContainerEventType.FINISHED); } + // Updating node resource utilization + node.setAggregatedContainersUtilization( + rmNode.getAggregatedContainersUtilization()); + node.setNodeUtilization(rmNode.getNodeUtilization()); if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 92f3edf..96207f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -110,11 +111,14 @@ public class MockNodes { private long lastHealthReportTime; private NodeState state; private Set<String> labels; + private ResourceUtilization containersUtilization; + private ResourceUtilization nodeUtilization; public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state, - Set<String> labels) { + Set<String> labels, ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; this.httpAddress = httpAddress; @@ -126,6 +130,8 @@ public class MockNodes { this.hostName = hostName; this.state = state; this.labels = labels; + this.containersUtilization = containersUtilization; + this.nodeUtilization = nodeUtilization; } @Override @@ -244,6 +250,16 @@ public class MockNodes { public List<Container> pullNewlyIncreasedContainers() { return Collections.emptyList(); } + + @Override + public ResourceUtilization getAggregatedContainersUtilization() { + return this.containersUtilization; + } + + @Override + public ResourceUtilization getNodeUtilization() { + return this.nodeUtilization; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, @@ -254,18 +270,19 @@ public class MockNodes { private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, Set<String> labels) { return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123, - labels); + labels, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port) { return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, - null); + null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port, - Set<String> labels) { + Set<String> labels, ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; @@ -277,7 +294,8 @@ public class MockNodes { final String httpAddress = httpAddr; String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, - rackName, healthReport, 0, nid, hostName, state, labels); + rackName, healthReport, 0, nid, hostName, state, labels, + containersUtilization, nodeUtilization); } public static RMNode nodeInfo(int rack, final Resource perNode, http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index a6e1575..33a077d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; @@ -647,8 +648,9 @@ public class TestRMNodeTransitions { statusList.add(status); NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true, "", System.currentTimeMillis()); - node.handle(new RMNodeStatusEvent(nodeId, nodeHealth, - statusList, null, null)); + NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, 0, statusList, null, + nodeHealth, null, null, null); + node.handle(new RMNodeStatusEvent(nodeId, nodeStatus, null)); Assert.assertEquals(1, node.getRunningApps().size()); @@ -689,8 +691,9 @@ public class TestRMNodeTransitions { RMNodeImpl node = getRunningNode(); NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); - node.handle(new RMNodeStatusEvent(node.getNodeID(), status, - new ArrayList<ContainerStatus>(), null, null)); + NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, + new ArrayList<ContainerStatus>(), null, status, null, null, null); + node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); return node; } @@ -863,8 +866,9 @@ public class TestRMNodeTransitions { RMNodeImpl node = getDecommissioningNode(); NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); - node.handle(new RMNodeStatusEvent(node.getNodeID(), status, - new ArrayList<ContainerStatus>(), null, null)); + NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, + new ArrayList<ContainerStatus>(), null, status, null, null, null); + node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 1f307aa..087199d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; @@ -163,9 +164,11 @@ public class TestRMAppLogAggregationStatus { LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, messageForNode1_1); node1ReportForApp.add(report1); - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus - .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, - null, node1ReportForApp, null)); + NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0, + new ArrayList<ContainerStatus>(), null, + NodeHealthStatus.newInstance(true, null, 0), null, null, null); + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1ReportForApp)); List<LogAggregationReport> node2ReportForApp = new ArrayList<LogAggregationReport>(); @@ -175,9 +178,11 @@ public class TestRMAppLogAggregationStatus { LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, messageForNode2_1); node2ReportForApp.add(report2); - node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus - .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, - null, node2ReportForApp, null)); + NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0, + new ArrayList<ContainerStatus>(), null, + NodeHealthStatus.newInstance(true, null, 0), null, null, null); + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, + node2ReportForApp)); // node1 and node2 has updated its log aggregation status // verify that the log aggregation status for node1, node2 // has been changed @@ -213,9 +218,8 @@ public class TestRMAppLogAggregationStatus { LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, messageForNode1_2); node1ReportForApp2.add(report1_2); - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus - .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, - null, node1ReportForApp2, null)); + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1ReportForApp2)); // verify that the log aggregation status for node1 // has been changed @@ -282,9 +286,8 @@ public class TestRMAppLogAggregationStatus { LogAggregationStatus.SUCCEEDED, "")); // For every logAggregationReport cached in memory, we can only save at most // 10 diagnostic messages/failure messages - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus - .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, - null, node1ReportForApp3, null)); + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1ReportForApp3)); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); Assert.assertEquals(2, logAggregationStatus.size()); @@ -327,9 +330,8 @@ public class TestRMAppLogAggregationStatus { LogAggregationStatus.FAILED, ""); node2ReportForApp2.add(report2_2); node2ReportForApp2.add(report2_3); - node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus - .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, - null, node2ReportForApp2, null)); + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, + node2ReportForApp2)); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport()); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index ec20bba..206edb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -142,8 +143,9 @@ public class TestRMWebServicesNodes extends JerseyTestBase { .get(nm3.getNodeId()); NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false, "test health report", System.currentTimeMillis()); - node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth, - new ArrayList<ContainerStatus>(), null, null)); + NodeStatus nodeStatus = NodeStatus.newInstance(nm3.getNodeId(), 1, + new ArrayList<ContainerStatus>(), null, nodeHealth, null, null, null); + node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeStatus, null)); rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY); ClientResponse response = http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index aa796ed..68c9efd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; @@ -594,19 +595,66 @@ public class MiniYARNCluster extends CompositeService { } } - private class CustomNodeManager extends NodeManager { + public class CustomNodeManager extends NodeManager { + protected NodeStatus nodeStatus; + + public void setNodeStatus(NodeStatus status) { + this.nodeStatus = status; + } + + /** + * Hook to allow modification/replacement of NodeStatus + * @param currentStatus Current status. + * @return New node status. + */ + protected NodeStatus getSimulatedNodeStatus(NodeStatus currentStatus) { + if(nodeStatus == null) { + return currentStatus; + } else { + // Increment response ID, the RMNodeStatusEvent will not get recorded + // for a duplicate heartbeat + nodeStatus.setResponseId(nodeStatus.getResponseId() + 1); + return nodeStatus; + } + } + @Override protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. } + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new NodeStatusUpdaterImpl(context, + dispatcher, + healthChecker, + metrics) { + + // Allow simulation of nodestatus + @Override + protected NodeStatus getNodeStatus(int responseId) throws IOException { + return getSimulatedNodeStatus(super.getNodeStatus(responseId)); + } + }; + } } private class ShortCircuitedNodeManager extends CustomNodeManager { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - return new NodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics) { + return new NodeStatusUpdaterImpl(context, + dispatcher, + healthChecker, + metrics) { + + // Allow simulation of nodestatus + @Override + protected NodeStatus getNodeStatus(int responseId) throws IOException { + return getSimulatedNodeStatus(super.getNodeStatus(responseId)); + } + @Override protected ResourceTracker getRMClient() { final ResourceTrackerService rt = http://git-wip-us.apache.org/repos/asf/hadoop/blob/52948bb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java new file mode 100644 index 0000000..77f6492 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster.CustomNodeManager; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.junit.Before; +import org.junit.Test; + +public class TestMiniYarnClusterNodeUtilization { + // Mini YARN cluster setup + private static final int NUM_RM = 1; + private static final int NUM_NM = 1; + + // Values for the first round + private static final int CONTAINER_PMEM_1 = 1024; + private static final int CONTAINER_VMEM_1 = 2048; + private static final float CONTAINER_CPU_1 = 11.0f; + + private static final int NODE_PMEM_1 = 10240; + private static final int NODE_VMEM_1 = 20480; + private static final float NODE_CPU_1 = 51.0f; + + // Values for the second round + private static final int CONTAINER_PMEM_2 = 2048; + private static final int CONTAINER_VMEM_2 = 4096; + private static final float CONTAINER_CPU_2 = 22.0f; + + private static final int NODE_PMEM_2 = 20480; + private static final int NODE_VMEM_2 = 40960; + private static final float NODE_CPU_2 = 61.0f; + + private MiniYARNCluster cluster; + private CustomNodeManager nm; + + private Configuration conf; + + private NodeStatus nodeStatus; + + @Before + public void setup() { + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + String name = TestMiniYarnClusterNodeUtilization.class.getName(); + cluster = new MiniYARNCluster(name, NUM_RM, NUM_NM, 1, 1); + cluster.init(conf); + cluster.start(); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + nm = (CustomNodeManager)cluster.getNodeManager(0); + int responseId = 1; + nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId, + CONTAINER_PMEM_1, CONTAINER_VMEM_1, CONTAINER_CPU_1, + NODE_PMEM_1, NODE_VMEM_1, NODE_CPU_1); + nm.setNodeStatus(nodeStatus); + } + + /** + * Simulates a NM heartbeat using the simulated NodeStatus fixture. Verify + * both the RMNode and SchedulerNode have been updated with the new + * utilization. + */ + @Test(timeout=60000) + public void testUpdateNodeUtilization() + throws InterruptedException, IOException, YarnException { + assertTrue("NMs fail to connect to the RM", + cluster.waitForNodeManagersToConnect(10000)); + + // Simulate heartbeat using NodeStatus fixture + NodeHeartbeatRequest request = + NodeHeartbeatRequest.newInstance(nodeStatus, null, null, null); + ResourceTracker tracker = + ServerRMProxy.createRMProxy(conf, ResourceTracker.class); + tracker.nodeHeartbeat(request); + + // Give the heartbeat time to propagate to the RM + verifySimulatedUtilization(); + + // Alter utilization + int responseId = 10; + nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId, + CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2, + NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2); + nm.setNodeStatus(nodeStatus); + tracker.nodeHeartbeat(request); + + // Give the heartbeat time to propagate to the RM + verifySimulatedUtilization(); + } + + /** + * Trigger the NM to send a heartbeat using the simulated NodeStatus fixture. + * Verify both the RMNode and SchedulerNode have been updated with the new + * utilization. + */ + @Test(timeout=60000) + public void testMockNodeStatusHeartbeat() + throws InterruptedException, YarnException { + assertTrue("NMs fail to connect to the RM", + cluster.waitForNodeManagersToConnect(10000)); + + NodeStatusUpdater updater = nm.getNodeStatusUpdater(); + updater.sendOutofBandHeartBeat(); + + // Give the heartbeat time to propagate to the RM + verifySimulatedUtilization(); + + // Alter utilization + int responseId = 20; + nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId, + CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2, + NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2); + nm.setNodeStatus(nodeStatus); + updater.sendOutofBandHeartBeat(); + + verifySimulatedUtilization(); + } + + /** + * Create a NodeStatus test vector. + * @param nodeId Node identifier. + * @param responseId Response identifier. + * @param containerPMem Virtual memory of the container. + * @param containerVMem Physical memory of the container. + * @param containerCPU CPU percentage of the container. + * @param nodePMem Physical memory of the node. + * @param nodeVMem Virtual memory of the node. + * @param nodeCPU CPU percentage of the node. + */ + private NodeStatus createNodeStatus( + NodeId nodeId, + int responseId, + int containerPMem, + int containerVMem, + float containerCPU, + int nodePMem, + int nodeVMem, + float nodeCPU) { + + // Fake node status with fake utilization + ResourceUtilization containersUtilization = + ResourceUtilization.newInstance(containerPMem, containerVMem, + containerCPU); + ResourceUtilization nodeUtilization = + ResourceUtilization.newInstance(nodePMem, nodeVMem, nodeCPU); + NodeStatus status = NodeStatus.newInstance( + nodeId, + responseId, + new ArrayList<ContainerStatus>(), + null, + NodeHealthStatus.newInstance(true, null, 0), + containersUtilization, + nodeUtilization, + null); + + return status; + } + + /** + * Verify both the RMNode and SchedulerNode have been updated with the test + * fixture utilization data. + * @param containersUtilization Utilization of the container. + * @param nodeUtilization Utilization of the node. + */ + private void verifySimulatedUtilization() throws InterruptedException { + ResourceManager rm = cluster.getResourceManager(0); + RMContext rmContext = rm.getRMContext(); + + ResourceUtilization containersUtilization = + nodeStatus.getContainersUtilization(); + ResourceUtilization nodeUtilization = + nodeStatus.getNodeUtilization(); + + // Give the heartbeat time to propagate to the RM (max 10 seconds) + // We check if the nodeUtilization is up to date + for (int i=0; i<100; i++) { + for (RMNode ni : rmContext.getRMNodes().values()) { + if (ni.getNodeUtilization().equals(nodeUtilization)) { + break; + } + } + Thread.sleep(100); + } + + // Verify the data is readable from the RM and scheduler nodes + for (RMNode ni : rmContext.getRMNodes().values()) { + ResourceUtilization cu = ni.getAggregatedContainersUtilization(); + assertEquals("Containers Utillization not propagated to RMNode", + containersUtilization, cu); + + ResourceUtilization nu = ni.getNodeUtilization(); + assertEquals("Node Utillization not propagated to RMNode", + nodeUtilization, nu); + + SchedulerNode scheduler = + rmContext.getScheduler().getSchedulerNode(ni.getNodeID()); + cu = scheduler.getAggregatedContainersUtilization(); + assertEquals("Containers Utillization not propagated to SchedulerNode", + containersUtilization, cu); + + nu = scheduler.getNodeUtilization(); + assertEquals("Node Utillization not propagated to SchedulerNode", + nodeUtilization, nu); + } + } +}