This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 42fab78  YARN-10467. ContainerIdPBImpl objects can be leaked in 
RMNodeImpl.completedContainers. Contributed by Haibo Chen
42fab78 is described below

commit 42fab7897a88d4cf42715b02f215e5f4436aba38
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Wed Oct 28 10:32:47 2020 -0700

    YARN-10467. ContainerIdPBImpl objects can be leaked in 
RMNodeImpl.completedContainers. Contributed by Haibo Chen
    
    (cherry picked from commit bab5bf9743f54f48cc2f31b4e5c8b6d4e5a5cfb8)
    (cherry picked from commit f95c0824b01175590fe98e2fba1e5988694a52da)
    (cherry picked from commit d0104e72c5fc83b2ede80bda4d178cfeee90539e)
---
 .../rmapp/attempt/RMAppAttemptImpl.java            |  28 +++--
 .../rmapp/attempt/TestRMAppAttemptTransitions.java | 115 +++++++++++++++++++++
 2 files changed, 137 insertions(+), 6 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index b8fb5b6..da146b2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -178,6 +178,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   private long launchAMEndTime = 0;
   private long scheduledTime = 0;
   private long containerAllocatedTime = 0;
+  private boolean nonWorkPreservingAMContainerFinished = false;
 
   // Set to null initially. Will eventually get set
   // if an RMAppAttemptUnregistrationEvent occurs
@@ -854,7 +855,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
 
       // A new allocate means the AM received the previously sent
       // finishedContainers. We can ack this to NM now
-      sendFinishedContainersToNM();
+      sendFinishedContainersToNM(finishedContainersSentToAM);
 
       // Mark every containerStatus as being sent to AM though we may return
       // only the ones that belong to the current attempt
@@ -1986,12 +1987,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   }
 
   // Ack NM to remove finished containers from context.
-  private void sendFinishedContainersToNM() {
-    for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
+  private void sendFinishedContainersToNM(
+      Map<NodeId, List<ContainerStatus>> finishedContainers) {
+    for (NodeId nodeId : finishedContainers.keySet()) {
 
       // Clear and get current values
       List<ContainerStatus> currentSentContainers =
-          finishedContainersSentToAM.put(nodeId, new ArrayList<>());
+          finishedContainers.put(nodeId, new ArrayList<>());
       List<ContainerId> containerIdList =
           new ArrayList<>(currentSentContainers.size());
       for (ContainerStatus containerStatus : currentSentContainers) {
@@ -2000,7 +2002,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
         containerIdList));
     }
-    this.finishedContainersSentToAM.clear();
+    finishedContainers.clear();
   }
 
   // Add am container to the list so that am container instance will be
@@ -2026,7 +2028,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
           new ArrayList<>());
       appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
-      appAttempt.sendFinishedContainersToNM();
+      appAttempt.sendFinishedContainersToNM(
+          appAttempt.finishedContainersSentToAM);
+      // there might be some completed containers that have not been pulled
+      // by the AM heartbeat, explicitly add them for cleanup.
+      appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
+
+      // mark the fact that AM container has finished so that future finished
+      // containers will be cleaned up without the engagement of AM containers
+      // (through heartbeat)
+      appAttempt.nonWorkPreservingAMContainerFinished = true;
     } else {
       appAttempt.sendFinishedAMContainerToNM(nodeId,
           containerStatus.getContainerId());
@@ -2054,6 +2065,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
         .getNodeId(), new ArrayList<>());
     appAttempt.justFinishedContainers.get(containerFinishedEvent
             .getNodeId()).add(containerFinishedEvent.getContainerStatus());
+
+    if (appAttempt.nonWorkPreservingAMContainerFinished) {
+      // AM container has finished, so no more AM heartbeats to do the cleanup.
+      appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
+    }
   }
 
   private static final class ContainerFinishedAtFinalStateTransition
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index f5f8b76..8e53b09 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -649,6 +649,8 @@ public class TestRMAppAttemptTransitions {
     RMContainer rmContainer = mock(RMContainerImpl.class);
     when(scheduler.getRMContainer(container.getId())).
         thenReturn(rmContainer);
+    when(container.getNodeId()).thenReturn(
+        BuilderUtils.newNodeId("localhost", 0));
     
     applicationAttempt.handle(
         new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
@@ -1536,6 +1538,119 @@ public class TestRMAppAttemptTransitions {
         .handle(Mockito.any(RMNodeEvent.class));
   }
 
+  /**
+   * Check a completed container that is not yet pulled by AM heartbeat,
+   * is ACKed to NM for cleanup when the AM container exits.
+   */
+  @Test
+  public void testFinishedContainerNotBeingPulledByAMHeartbeat() {
+    Container amContainer = allocateApplicationAttempt();
+    launchApplicationAttempt(amContainer);
+    runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+
+    application.handle(new RMAppRunningOnNodeEvent(application
+        .getApplicationId(), amContainer.getNodeId()));
+
+    // Complete a non-AM container
+    ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
+        .getAppAttemptId(), 2);
+    Container container1 = mock(Container.class);
+    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+    when(container1.getId()).thenReturn(
+        containerId1);
+    when(containerStatus1.getContainerId()).thenReturn(containerId1);
+    when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+        applicationAttempt.getAppAttemptId(), containerStatus1,
+        container1.getNodeId()));
+
+    // Verify justFinishedContainers
+    ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
+        ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
+    Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
+        .size());
+    Assert.assertEquals(container1.getId(), applicationAttempt
+        .getJustFinishedContainers().get(0).getContainerId());
+    Assert.assertTrue(
+        getFinishedContainersSentToAM(applicationAttempt).isEmpty());
+
+    // finish AM container to emulate AM exit event
+    containerStatus1 = mock(ContainerStatus.class);
+    ContainerId amContainerId = amContainer.getId();
+    when(containerStatus1.getContainerId()).thenReturn(amContainerId);
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+        applicationAttempt.getAppAttemptId(), containerStatus1,
+        amContainer.getNodeId()));
+
+    Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
+    List<RMNodeFinishedContainersPulledByAMEvent> containerPulledEvents =
+        captor.getAllValues();
+    // Verify AM container is acked to NM via the RMNodeEvent immediately
+    Assert.assertEquals(amContainer.getId(),
+        containerPulledEvents.get(0).getContainers().get(0));
+    // Verify the non-AM container is acked to NM via the RMNodeEvent
+    Assert.assertEquals(container1.getId(),
+        containerPulledEvents.get(1).getContainers().get(0));
+    Assert.assertTrue("No container shall be added to justFinishedContainers" +
+            " as soon as AM container exits",
+        applicationAttempt.getJustFinishedContainers().isEmpty());
+    Assert.assertTrue(
+        getFinishedContainersSentToAM(applicationAttempt).isEmpty());
+  }
+
+  /**
+   * Check a completed container is ACKed to NM for cleanup after the AM
+   * container has exited.
+   */
+  @Test
+  public void testFinishedContainerAfterAMExit() {
+    Container amContainer = allocateApplicationAttempt();
+    launchApplicationAttempt(amContainer);
+    runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+
+    // finish AM container to emulate AM exit event
+    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+    ContainerId amContainerId = amContainer.getId();
+    when(containerStatus1.getContainerId()).thenReturn(amContainerId);
+    application.handle(new RMAppRunningOnNodeEvent(application
+        .getApplicationId(),
+        amContainer.getNodeId()));
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+        applicationAttempt.getAppAttemptId(), containerStatus1,
+        amContainer.getNodeId()));
+
+    // Verify AM container is acked to NM via the RMNodeEvent immediately
+    ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
+        ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
+    Mockito.verify(rmnodeEventHandler).handle(captor.capture());
+    Assert.assertEquals(amContainer.getId(),
+        captor.getValue().getContainers().get(0));
+
+    // Complete a non-AM container
+    ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
+        .getAppAttemptId(), 2);
+    Container container1 = mock(Container.class);
+    containerStatus1 = mock(ContainerStatus.class);
+    when(container1.getId()).thenReturn(containerId1);
+    when(containerStatus1.getContainerId()).thenReturn(containerId1);
+    when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+        applicationAttempt.getAppAttemptId(), containerStatus1,
+        container1.getNodeId()));
+
+    // Verify container is acked to NM via the RMNodeEvent immediately
+    captor = ArgumentCaptor.forClass(
+        RMNodeFinishedContainersPulledByAMEvent.class);
+    Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
+    Assert.assertEquals(container1.getId(),
+        captor.getAllValues().get(1).getContainers().get(0));
+    Assert.assertTrue("No container shall be added to justFinishedContainers" +
+            " after AM container exited",
+        applicationAttempt.getJustFinishedContainers().isEmpty());
+    Assert.assertTrue(
+        getFinishedContainersSentToAM(applicationAttempt).isEmpty());
+  }
+
   private static List<ContainerStatus> getFinishedContainersSentToAM(
       RMAppAttempt applicationAttempt) {
     List<ContainerStatus> containers = new ArrayList<ContainerStatus>();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to