YARN-3445. Cache runningApps in RMNode for getting running apps on given NodeId. (Junping Du via mingma)
(cherry picked from commit 08244264c0583472b9c4e16591cfde72c6db62a2) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b169889f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b169889f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b169889f Branch: refs/heads/branch-2 Commit: b169889f01309757197a8a27b6244a87c77a3ce3 Parents: 5e6bbe6 Author: Ming Ma <min...@apache.org> Authored: Fri Jul 10 08:30:10 2015 -0700 Committer: Ming Ma <min...@apache.org> Committed: Fri Jul 10 08:34:01 2015 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 8 +++- .../yarn/sls/scheduler/RMNodeWrapper.java | 5 +++ hadoop-yarn-project/CHANGES.txt | 3 ++ .../server/resourcemanager/rmnode/RMNode.java | 2 + .../resourcemanager/rmnode/RMNodeImpl.java | 43 ++++++++++++++++---- .../yarn/server/resourcemanager/MockNodes.java | 5 +++ .../resourcemanager/TestRMNodeTransitions.java | 36 ++++++++++++++-- 7 files changed, 91 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b169889f/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index ee6eb7b..440779c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -62,7 +62,8 @@ public class NodeInfo { private NodeState state; private List<ContainerId> toCleanUpContainers; private List<ApplicationId> toCleanUpApplications; - + private List<ApplicationId> runningApplications; + public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, int cmdPort, String hostName, NodeState state) { @@ -77,6 +78,7 @@ public class NodeInfo { this.state = state; toCleanUpApplications = new ArrayList<ApplicationId>(); toCleanUpContainers = new ArrayList<ContainerId>(); + runningApplications = new ArrayList<ApplicationId>(); } public NodeId getNodeID() { @@ -135,6 +137,10 @@ public class NodeInfo { return toCleanUpApplications; } + public List<ApplicationId> getRunningApps() { + return runningApplications; + } + public void updateNodeHeartbeatResponseForCleanup( NodeHeartbeatResponse response) { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b169889f/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index b64be1b..a6633ae 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -119,6 +119,11 @@ public class RMNodeWrapper implements RMNode { } @Override + public List<ApplicationId> getRunningApps() { + return node.getRunningApps(); + } + + @Override public void updateNodeHeartbeatResponseForCleanup( NodeHeartbeatResponse nodeHeartbeatResponse) { node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b169889f/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2331b67..74705f2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1637,6 +1637,9 @@ Release 2.6.0 - 2014-11-18 YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max share (Siqi Li via Sandy Ryza) + YARN-3445. Cache runningApps in RMNode for getting running apps on given + NodeId. (Junping Du via mingma) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu http://git-wip-us.apache.org/repos/asf/hadoop/blob/b169889f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 95eeaf6..0386be6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -119,6 +119,8 @@ public interface RMNode { public List<ApplicationId> getAppsToCleanup(); + List<ApplicationId> getRunningApps(); + /** * Update a {@link NodeHeartbeatResponse} with the list of containers and * applications to clean up for this node. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b169889f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d1e6190..9bc91c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -123,11 +123,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { new HashSet<ContainerId>(); /* the list of applications that have finished and need to be purged */ - private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>(); + private final List<ApplicationId> finishedApplications = + new ArrayList<ApplicationId>(); + + /* the list of applications that are running on this node */ + private final List<ApplicationId> runningApplications = + new ArrayList<ApplicationId>(); private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); - + private static final StateMachineFactory<RMNodeImpl, NodeState, RMNodeEventType, @@ -136,7 +141,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { NodeState, RMNodeEventType, RMNodeEvent>(NodeState.NEW) - + //Transitions from NEW state .addTransition(NodeState.NEW, NodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) @@ -383,6 +388,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } @Override + public List<ApplicationId> getRunningApps() { + this.readLock.lock(); + try { + return new ArrayList<ApplicationId>(this.runningApplications); + } finally { + this.readLock.unlock(); + } + } + + @Override public List<ContainerId> getContainersToCleanUp() { this.readLock.lock(); @@ -519,9 +534,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { LOG.warn("Cannot get RMApp by appId=" + appId + ", just added it to finishedApplications list for cleanup"); rmNode.finishedApplications.add(appId); + rmNode.runningApplications.remove(appId); return; } + // Add running applications back due to Node add or Node reconnection. + rmNode.runningApplications.add(appId); context.getDispatcher().getEventHandler() .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); } @@ -707,8 +725,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - rmNode.finishedApplications.add((( - RMNodeCleanAppEvent) event).getAppId()); + ApplicationId appId = ((RMNodeCleanAppEvent) event).getAppId(); + rmNode.finishedApplications.add(appId); + rmNode.runningApplications.remove(appId); } } @@ -910,12 +929,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { + "cleanup, no further processing"); continue; } - if (finishedApplications.contains(containerId.getApplicationAttemptId() - .getApplicationId())) { + + ApplicationId containerAppId = + containerId.getApplicationAttemptId().getApplicationId(); + + if (finishedApplications.contains(containerAppId)) { LOG.info("Container " + containerId + " belongs to an application that is already killed," + " no further processing"); continue; + } else if (!runningApplications.contains(containerAppId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Container " + containerId + + " is the first container get launched for application " + + containerAppId); + } + runningApplications.add(containerAppId); } // Process running containers http://git-wip-us.apache.org/repos/asf/hadoop/blob/b169889f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 2d863d1..095fe28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -187,6 +187,11 @@ public class MockNodes { } @Override + public List<ApplicationId> getRunningApps() { + return null; + } + + @Override public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b169889f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 01f4357..ece896b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -33,6 +33,7 @@ import java.util.List; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -485,9 +486,9 @@ public class TestRMNodeTransitions { NodeId nodeId = node.getNodeID(); // Expire a container - ContainerId completedContainerId = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(0, 0), 0), 0); + ContainerId completedContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(0, 0), 0), 0); node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId)); Assert.assertEquals(1, node.getContainersToCleanUp().size()); @@ -512,6 +513,35 @@ public class TestRMNodeTransitions { Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0)); } + @Test(timeout=20000) + public void testUpdateHeartbeatResponseForAppLifeCycle() { + RMNodeImpl node = getRunningNode(); + NodeId nodeId = node.getNodeID(); + + ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1); + // Create a running container + ContainerId runningContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + runningAppId, 0), 0); + + ContainerStatus status = ContainerStatus.newInstance(runningContainerId, + ContainerState.RUNNING, "", 0); + List<ContainerStatus> statusList = new ArrayList<ContainerStatus>(); + statusList.add(status); + NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true, + "", System.currentTimeMillis()); + node.handle(new RMNodeStatusEvent(nodeId, nodeHealth, + statusList, null, null)); + + Assert.assertEquals(1, node.getRunningApps().size()); + + // Finish an application + ApplicationId finishedAppId = runningAppId; + node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId)); + Assert.assertEquals(1, node.getAppsToCleanup().size()); + Assert.assertEquals(0, node.getRunningApps().size()); + } + private RMNodeImpl getRunningNode() { return getRunningNode(null, 0); }