YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed container statuses on heartbeat. Contributed by Chengbing Liu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/786108d3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/786108d3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/786108d3 Branch: refs/heads/HDFS-EC Commit: 786108d311f370a706026193ef7a5f335b7a7cab Parents: b174410 Author: Jian He <[email protected]> Authored: Thu Jan 8 11:12:54 2015 -0800 Committer: Zhe Zhang <[email protected]> Committed: Mon Jan 12 10:18:01 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../nodemanager/NodeStatusUpdaterImpl.java | 39 ++++++++++----- .../nodemanager/TestNodeStatusUpdater.java | 52 +++++++++++++------- 3 files changed, 64 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/786108d3/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c7e65f1..514c282 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -336,6 +336,9 @@ Release 2.7.0 - UNRELEASED YARN-2936. Changed YARNDelegationTokenIdentifier to set proto fields on getProto method. (Varun Saxena via jianhe) + YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed + container statuses on heartbeat. (Chengbing Liu via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/786108d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f561dbb..6ddd7e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -106,6 +106,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // the AM finishes it informs the RM to stop the may-be-already-completed // containers. private final Map<ContainerId, Long> recentlyStoppedContainers; + // Save the reported completed containers in case of lost heartbeat responses. + // These completed containers will be sent again till a successful response. + private final Map<ContainerId, ContainerStatus> pendingCompletedContainers; // Duration for which to track recently stopped container. private long durationToTrackStoppedContainers; @@ -126,6 +129,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>(); + this.pendingCompletedContainers = + new HashMap<ContainerId, ContainerStatus>(); } @Override @@ -358,11 +363,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>(); for (Container container : this.context.getContainers().values()) { ContainerId containerId = container.getContainerId(); - ApplicationId applicationId = container.getContainerId() - .getApplicationAttemptId().getApplicationId(); + ApplicationId applicationId = containerId.getApplicationAttemptId() + .getApplicationId(); org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); - containerStatuses.add(containerStatus); if (containerStatus.getState() == ContainerState.COMPLETE) { if (isApplicationStopped(applicationId)) { if (LOG.isDebugEnabled()) { @@ -370,14 +374,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements + containerId + " from NM context."); } context.getContainers().remove(containerId); + pendingCompletedContainers.put(containerId, containerStatus); } else { - // Adding to finished containers cache. Cache will keep it around at - // least for #durationToTrackStoppedContainers duration. In the - // subsequent call to stop container it will get removed from cache. - addCompletedContainer(container.getContainerId()); + if (!isContainerRecentlyStopped(containerId)) { + pendingCompletedContainers.put(containerId, containerStatus); + // Adding to finished containers cache. Cache will keep it around at + // least for #durationToTrackStoppedContainers duration. In the + // subsequent call to stop container it will get removed from cache. + addCompletedContainer(containerId); + } } + } else { + containerStatuses.add(containerStatus); } } + containerStatuses.addAll(pendingCompletedContainers.values()); if (LOG.isDebugEnabled()) { LOG.debug("Sending out " + containerStatuses.size() + " container statuses: " + containerStatuses); @@ -397,8 +408,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new ArrayList<NMContainerStatus>(); for (Container container : this.context.getContainers().values()) { ContainerId containerId = container.getContainerId(); - ApplicationId applicationId = container.getContainerId() - .getApplicationAttemptId().getApplicationId(); + ApplicationId applicationId = containerId.getApplicationAttemptId() + .getApplicationId(); if (!this.context.getApplications().containsKey(applicationId)) { context.getContainers().remove(containerId); continue; @@ -410,7 +421,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - addCompletedContainer(container.getContainerId()); + addCompletedContainer(containerId); } } LOG.info("Sending out " + containerStatuses.size() @@ -457,7 +468,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements ContainerId containerId = iter.next(); // remove the container only if the container is at DONE state Container nmContainer = context.getContainers().get(containerId); - if (nmContainer != null && nmContainer.getContainerState().equals( + if (nmContainer == null) { + iter.remove(); + } else if (nmContainer.getContainerState().equals( org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { context.getContainers().remove(containerId); removedContainers.add(containerId); @@ -469,6 +482,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Removed completed containers from NM context: " + removedContainers); } + pendingCompletedContainers.clear(); } private void trackAppsForKeepAlive(List<ApplicationId> appIds) { @@ -507,7 +521,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements recentlyStoppedContainers.clear(); } } - + @Private @VisibleForTesting public void removeVeryOldStoppedContainersFromCache() { @@ -605,6 +619,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements ResourceManagerConstants.RM_INVALID_IDENTIFIER; dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); + pendingCompletedContainers.clear(); break; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/786108d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index e367085..46d7b10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -610,14 +610,14 @@ public class TestNodeStatusUpdater { <ContainerId>(); try { if (heartBeatID == 0) { - Assert.assertEquals(request.getNodeStatus().getContainersStatuses() - .size(), 0); - Assert.assertEquals(context.getContainers().size(), 0); + Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses() + .size()); + Assert.assertEquals(0, context.getContainers().size()); } else if (heartBeatID == 1) { List<ContainerStatus> statuses = request.getNodeStatus().getContainersStatuses(); - Assert.assertEquals(statuses.size(), 2); - Assert.assertEquals(context.getContainers().size(), 2); + Assert.assertEquals(2, statuses.size()); + Assert.assertEquals(2, context.getContainers().size()); boolean container2Exist = false, container3Exist = false; for (ContainerStatus status : statuses) { @@ -643,8 +643,16 @@ public class TestNodeStatusUpdater { } else if (heartBeatID == 2 || heartBeatID == 3) { List<ContainerStatus> statuses = request.getNodeStatus().getContainersStatuses(); - Assert.assertEquals(statuses.size(), 4); - Assert.assertEquals(context.getContainers().size(), 4); + if (heartBeatID == 2) { + // NM should send completed containers again, since the last + // heartbeat is lost. + Assert.assertEquals(4, statuses.size()); + } else { + // NM should not send completed containers again, since the last + // heartbeat is successful. + Assert.assertEquals(2, statuses.size()); + } + Assert.assertEquals(4, context.getContainers().size()); boolean container2Exist = false, container3Exist = false, container4Exist = false, container5Exist = false; @@ -674,8 +682,14 @@ public class TestNodeStatusUpdater { container5Exist = true; } } - Assert.assertTrue(container2Exist && container3Exist - && container4Exist && container5Exist); + if (heartBeatID == 2) { + Assert.assertTrue(container2Exist && container3Exist + && container4Exist && container5Exist); + } else { + // NM do not send completed containers again + Assert.assertTrue(container2Exist && !container3Exist + && container4Exist && !container5Exist); + } if (heartBeatID == 3) { finishedContainersPulledByAM.add(containerStatus3.getContainerId()); @@ -683,8 +697,9 @@ public class TestNodeStatusUpdater { } else if (heartBeatID == 4) { List<ContainerStatus> statuses = request.getNodeStatus().getContainersStatuses(); - Assert.assertEquals(statuses.size(), 3); - Assert.assertEquals(context.getContainers().size(), 3); + Assert.assertEquals(2, statuses.size()); + // Container 3 is acked by AM, hence removed from context + Assert.assertEquals(3, context.getContainers().size()); boolean container3Exist = false; for (ContainerStatus status : statuses) { @@ -917,13 +932,14 @@ public class TestNodeStatusUpdater { nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers); Set<ContainerId> containerIdSet = new HashSet<ContainerId>(); - for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) { + List<ContainerStatus> containerStatuses = nodeStatusUpdater.getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { containerIdSet.add(status.getContainerId()); } - Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1); + Assert.assertEquals(1, containerStatuses.size()); // completed container is removed; - Assert.assertFalse(containerIdSet.contains(anyCompletedContainer)); + Assert.assertFalse(containerIdSet.contains(cId)); // running container is not removed; Assert.assertTrue(containerIdSet.contains(runningContainerId)); } @@ -967,15 +983,15 @@ public class TestNodeStatusUpdater { when(application.getApplicationState()).thenReturn( ApplicationState.FINISHING_CONTAINERS_WAIT); - // The completed container will be sent one time. Then we will delete it. + // The completed container will be saved in case of lost heartbeat. + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); - Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size()); nm.getNMContext().getContainers().put(cId, anyCompletedContainer); nm.getNMContext().getApplications().remove(appId); - // The completed container will be sent one time. Then we will delete it. + // The completed container will be saved in case of lost heartbeat. + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); - Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size()); } @Test
