This is an automated email from the ASF dual-hosted git repository. jhung pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 42fab78 YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen 42fab78 is described below commit 42fab7897a88d4cf42715b02f215e5f4436aba38 Author: Jonathan Hung <jh...@linkedin.com> AuthorDate: Wed Oct 28 10:32:47 2020 -0700 YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen (cherry picked from commit bab5bf9743f54f48cc2f31b4e5c8b6d4e5a5cfb8) (cherry picked from commit f95c0824b01175590fe98e2fba1e5988694a52da) (cherry picked from commit d0104e72c5fc83b2ede80bda4d178cfeee90539e) --- .../rmapp/attempt/RMAppAttemptImpl.java | 28 +++-- .../rmapp/attempt/TestRMAppAttemptTransitions.java | 115 +++++++++++++++++++++ 2 files changed, 137 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index b8fb5b6..da146b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -178,6 +178,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private long launchAMEndTime = 0; private long scheduledTime = 0; private long containerAllocatedTime = 0; + private boolean nonWorkPreservingAMContainerFinished = false; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -854,7 +855,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // A new allocate means the AM received the previously sent // finishedContainers. We can ack this to NM now - sendFinishedContainersToNM(); + sendFinishedContainersToNM(finishedContainersSentToAM); // Mark every containerStatus as being sent to AM though we may return // only the ones that belong to the current attempt @@ -1986,12 +1987,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } // Ack NM to remove finished containers from context. - private void sendFinishedContainersToNM() { - for (NodeId nodeId : finishedContainersSentToAM.keySet()) { + private void sendFinishedContainersToNM( + Map<NodeId, List<ContainerStatus>> finishedContainers) { + for (NodeId nodeId : finishedContainers.keySet()) { // Clear and get current values List<ContainerStatus> currentSentContainers = - finishedContainersSentToAM.put(nodeId, new ArrayList<>()); + finishedContainers.put(nodeId, new ArrayList<>()); List<ContainerId> containerIdList = new ArrayList<>(currentSentContainers.size()); for (ContainerStatus containerStatus : currentSentContainers) { @@ -2000,7 +2002,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, containerIdList)); } - this.finishedContainersSentToAM.clear(); + finishedContainers.clear(); } // Add am container to the list so that am container instance will be @@ -2026,7 +2028,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList<>()); appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus); - appAttempt.sendFinishedContainersToNM(); + appAttempt.sendFinishedContainersToNM( + appAttempt.finishedContainersSentToAM); + // there might be some completed containers that have not been pulled + // by the AM heartbeat, explicitly add them for cleanup. + appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); + + // mark the fact that AM container has finished so that future finished + // containers will be cleaned up without the engagement of AM containers + // (through heartbeat) + appAttempt.nonWorkPreservingAMContainerFinished = true; } else { appAttempt.sendFinishedAMContainerToNM(nodeId, containerStatus.getContainerId()); @@ -2054,6 +2065,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { .getNodeId(), new ArrayList<>()); appAttempt.justFinishedContainers.get(containerFinishedEvent .getNodeId()).add(containerFinishedEvent.getContainerStatus()); + + if (appAttempt.nonWorkPreservingAMContainerFinished) { + // AM container has finished, so no more AM heartbeats to do the cleanup. + appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); + } } private static final class ContainerFinishedAtFinalStateTransition diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index f5f8b76..8e53b09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -649,6 +649,8 @@ public class TestRMAppAttemptTransitions { RMContainer rmContainer = mock(RMContainerImpl.class); when(scheduler.getRMContainer(container.getId())). thenReturn(rmContainer); + when(container.getNodeId()).thenReturn( + BuilderUtils.newNodeId("localhost", 0)); applicationAttempt.handle( new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), @@ -1536,6 +1538,119 @@ public class TestRMAppAttemptTransitions { .handle(Mockito.any(RMNodeEvent.class)); } + /** + * Check a completed container that is not yet pulled by AM heartbeat, + * is ACKed to NM for cleanup when the AM container exits. + */ + @Test + public void testFinishedContainerNotBeingPulledByAMHeartbeat() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + + application.handle(new RMAppRunningOnNodeEvent(application + .getApplicationId(), amContainer.getNodeId())); + + // Complete a non-AM container + ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt + .getAppAttemptId(), 2); + Container container1 = mock(Container.class); + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + when(container1.getId()).thenReturn( + containerId1); + when(containerStatus1.getContainerId()).thenReturn(containerId1); + when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + container1.getNodeId())); + + // Verify justFinishedContainers + ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor = + ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class); + Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers() + .size()); + Assert.assertEquals(container1.getId(), applicationAttempt + .getJustFinishedContainers().get(0).getContainerId()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + + // finish AM container to emulate AM exit event + containerStatus1 = mock(ContainerStatus.class); + ContainerId amContainerId = amContainer.getId(); + when(containerStatus1.getContainerId()).thenReturn(amContainerId); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + amContainer.getNodeId())); + + Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture()); + List<RMNodeFinishedContainersPulledByAMEvent> containerPulledEvents = + captor.getAllValues(); + // Verify AM container is acked to NM via the RMNodeEvent immediately + Assert.assertEquals(amContainer.getId(), + containerPulledEvents.get(0).getContainers().get(0)); + // Verify the non-AM container is acked to NM via the RMNodeEvent + Assert.assertEquals(container1.getId(), + containerPulledEvents.get(1).getContainers().get(0)); + Assert.assertTrue("No container shall be added to justFinishedContainers" + + " as soon as AM container exits", + applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + } + + /** + * Check a completed container is ACKed to NM for cleanup after the AM + * container has exited. + */ + @Test + public void testFinishedContainerAfterAMExit() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + + // finish AM container to emulate AM exit event + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + ContainerId amContainerId = amContainer.getId(); + when(containerStatus1.getContainerId()).thenReturn(amContainerId); + application.handle(new RMAppRunningOnNodeEvent(application + .getApplicationId(), + amContainer.getNodeId())); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + amContainer.getNodeId())); + + // Verify AM container is acked to NM via the RMNodeEvent immediately + ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor = + ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class); + Mockito.verify(rmnodeEventHandler).handle(captor.capture()); + Assert.assertEquals(amContainer.getId(), + captor.getValue().getContainers().get(0)); + + // Complete a non-AM container + ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt + .getAppAttemptId(), 2); + Container container1 = mock(Container.class); + containerStatus1 = mock(ContainerStatus.class); + when(container1.getId()).thenReturn(containerId1); + when(containerStatus1.getContainerId()).thenReturn(containerId1); + when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + container1.getNodeId())); + + // Verify container is acked to NM via the RMNodeEvent immediately + captor = ArgumentCaptor.forClass( + RMNodeFinishedContainersPulledByAMEvent.class); + Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture()); + Assert.assertEquals(container1.getId(), + captor.getAllValues().get(1).getContainers().get(0)); + Assert.assertTrue("No container shall be added to justFinishedContainers" + + " after AM container exited", + applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + } + private static List<ContainerStatus> getFinishedContainersSentToAM( RMAppAttempt applicationAttempt) { List<ContainerStatus> containers = new ArrayList<ContainerStatus>(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org