This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5e9510b549 YARN-8980. Mapreduce application container start fail 
after AM restart. (#5975) Contributed by Chenyu Zheng.
c5e9510b549 is described below

commit c5e9510b54932946f792cfc88acf8b022b5d2fe8
Author: zhengchenyu <zhengcheny...@163.com>
AuthorDate: Sat Sep 9 09:50:53 2023 +0800

    YARN-8980. Mapreduce application container start fail after AM restart. 
(#5975) Contributed by Chenyu Zheng.
    
    Reviewed-by: Shilun Fan <slfan1...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../amrmproxy/FederationInterceptor.java           |  26 ++++
 .../resourcemanager/DefaultAMSProcessor.java       |   7 +-
 .../TestWorkPreservingUnmanagedAM.java             | 155 +++++++++++++++++++--
 3 files changed, 176 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index e7fcb1c3785..9c4c2c72e5c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -98,6 +98,7 @@ import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -260,6 +261,16 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
 
   private final MonotonicClock clock = new MonotonicClock();
 
+  /*
+   * For UAM, keepContainersAcrossApplicationAttempts is always true.
+   * When re-register to RM, RM will clear node set and regenerate NMToken for 
transferred
+   * container. But If keepContainersAcrossApplicationAttempts of AM is false, 
AM may not
+   * called getNMTokensFromPreviousAttempts, so the NMToken which is pass from
+   * RegisterApplicationMasterResponse will be missing. Here we cache these 
NMToken,
+   * then pass to AM in allocate stage.
+   * */
+  private Set<NMToken> nmTokenMapFromRegisterSecondaryCluster;
+
   /**
    * Creates an instance of the FederationInterceptor class.
    */
@@ -278,6 +289,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     this.finishAMCalled = false;
     this.lastSCResponseTime = new ConcurrentHashMap<>();
     this.lastAMHeartbeatTime = this.clock.getTime();
+    this.nmTokenMapFromRegisterSecondaryCluster = new ConcurrentHashSet<>();
   }
 
   /**
@@ -453,6 +465,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
           // RegisterApplicationMaster
           RegisterApplicationMasterResponse response =
               this.uamPool.registerApplicationMaster(keyScId, 
this.amRegistrationRequest);
+          
nmTokenMapFromRegisterSecondaryCluster.addAll(response.getNMTokensFromPreviousAttempts());
 
           // Set sub-cluster to be timed out initially
           lastSCResponseTime.put(subClusterId, clock.getTime() - 
subClusterTimeOut);
@@ -1096,6 +1109,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
         if (registerResponse != null) {
           LOG.info("Merging register response for {}", appId);
           mergeRegisterResponse(homeResponse, registerResponse);
+          nmTokenMapFromRegisterSecondaryCluster.addAll(
+              registerResponse.getNMTokensFromPreviousAttempts());
         }
       } catch (Exception e) {
         LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e);
@@ -1434,6 +1449,17 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
         }
       }
     }
+    // When re-register RM, client may not cache the NMToken from register 
response.
+    // Here we pass these NMToken in allocate stage.
+    if (nmTokenMapFromRegisterSecondaryCluster.size() > 0) {
+      List<NMToken> duplicateNmToken = new 
ArrayList(nmTokenMapFromRegisterSecondaryCluster);
+      nmTokenMapFromRegisterSecondaryCluster.removeAll(duplicateNmToken);
+      if (!isNullOrEmpty(mergedResponse.getNMTokens())) {
+        mergedResponse.getNMTokens().addAll(duplicateNmToken);
+      } else {
+        mergedResponse.setNMTokens(duplicateNmToken);
+      }
+    }
   }
 
   /**
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 98fb9b34942..96e65855fee 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -168,14 +168,13 @@ final class DefaultAMSProcessor implements 
ApplicationMasterServiceProcessor {
     // and corresponding NM tokens.
     if (app.getApplicationSubmissionContext()
         .getKeepContainersAcrossApplicationAttempts()) {
+      // Clear the node set remembered by the secret manager. Necessary
+      // for UAM restart because we use the same attemptId.
+      
rmContext.getNMTokenSecretManager().clearNodeSetForAttempt(applicationAttemptId);
       List<Container> transferredContainers = getScheduler()
           .getTransferredContainers(applicationAttemptId);
       if (!transferredContainers.isEmpty()) {
         response.setContainersFromPreviousAttempts(transferredContainers);
-        // Clear the node set remembered by the secret manager. Necessary
-        // for UAM restart because we use the same attemptId.
-        rmContext.getNMTokenSecretManager()
-            .clearNodeSetForAttempt(applicationAttemptId);
 
         List<NMToken> nmTokens = new ArrayList<NMToken>();
         for (Container container : transferredContainers) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java
index 3ed5cfd4f7b..3c57dfc19e6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java
@@ -21,15 +21,19 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
@@ -71,6 +75,7 @@ public class TestWorkPreservingUnmanagedAM
     MockNM nm =
         new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
     nm.registerNode();
+    Set<NodeId> tokenCacheClientSide = new HashSet();
 
     // create app and launch the UAM
     boolean unamanged = true;
@@ -98,14 +103,19 @@ public class TestWorkPreservingUnmanagedAM
 
     // Allocate two containers to UAM
     int numContainers = 3;
-    List<Container> conts = am.allocate("127.0.0.1", 1000, numContainers,
-        new ArrayList<ContainerId>()).getAllocatedContainers();
+    AllocateResponse allocateResponse =
+        am.allocate("127.0.0.1", 1000, numContainers, new 
ArrayList<ContainerId>());
+    allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+    List<Container> conts = allocateResponse.getAllocatedContainers();
     while (conts.size() < numContainers) {
       nm.nodeHeartbeat(true);
-      conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
-          new ArrayList<ContainerId>()).getAllocatedContainers());
+      allocateResponse =
+          am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+      allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+      conts.addAll(allocateResponse.getAllocatedContainers());
       Thread.sleep(100);
     }
+    checkNMTokenForContainer(tokenCacheClientSide, conts);
 
     // Release one container
     List<ContainerId> releaseList =
@@ -127,6 +137,10 @@ public class TestWorkPreservingUnmanagedAM
     RegisterApplicationMasterResponse response = null;
     try {
       response = am.registerAppAttempt(false);
+      // When AM restart, it means nmToken in client side should be missing
+      tokenCacheClientSide.clear();
+      response.getNMTokensFromPreviousAttempts()
+          .forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
     } catch (InvalidApplicationMasterRequestException e) {
       Assert.assertEquals(false, keepContainers);
       return;
@@ -142,14 +156,124 @@ public class TestWorkPreservingUnmanagedAM
     numContainers = 1;
     am.allocate("127.0.0.1", 1000, numContainers, new 
ArrayList<ContainerId>());
     nm.nodeHeartbeat(true);
-    conts = am.allocate(new ArrayList<ResourceRequest>(),
-        new ArrayList<ContainerId>()).getAllocatedContainers();
+    allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+    allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+    conts = allocateResponse.getAllocatedContainers();
     while (conts.size() < numContainers) {
       nm.nodeHeartbeat(true);
-      conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
-          new ArrayList<ContainerId>()).getAllocatedContainers());
+      allocateResponse =
+          am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+      allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+      conts.addAll(allocateResponse.getAllocatedContainers());
       Thread.sleep(100);
     }
+    checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+    rm.stop();
+  }
+
+  protected void testUAMRestartWithoutTransferContainer(boolean 
keepContainers) throws Exception {
+    // start RM
+    MockRM rm = new MockRM();
+    rm.start();
+    MockNM nm =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm.registerNode();
+    Set<NodeId> tokenCacheClientSide = new HashSet();
+
+    // create app and launch the UAM
+    boolean unamanged = true;
+    int maxAttempts = 1;
+    boolean waitForAccepted = true;
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
+            .withAppName("")
+            .withUser(UserGroupInformation.getCurrentUser().getShortUserName())
+            .withAcls(null)
+            .withUnmanagedAM(unamanged)
+            .withQueue(null)
+            .withMaxAppAttempts(maxAttempts)
+            .withCredentials(null)
+            .withAppType(null)
+            .withWaitForAppAcceptedState(waitForAccepted)
+            .withKeepContainers(keepContainers)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+
+    MockAM am = MockRM.launchUAM(app, rm, nm);
+
+    // Register for the first time
+    am.registerAppAttempt();
+
+    // Allocate two containers to UAM
+    int numContainers = 3;
+    AllocateResponse allocateResponse =
+        am.allocate("127.0.0.1", 1000, numContainers, new 
ArrayList<ContainerId>());
+    allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+    List<Container> conts = allocateResponse.getAllocatedContainers();
+    while (conts.size() < numContainers) {
+      nm.nodeHeartbeat(true);
+      allocateResponse =
+          am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+      allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+      conts.addAll(allocateResponse.getAllocatedContainers());
+      Thread.sleep(100);
+    }
+    checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+    // Release all containers, then there are no transfer containfer app 
attempt
+    List<ContainerId> releaseList = new ArrayList();
+    releaseList.add(conts.get(0).getId());
+    releaseList.add(conts.get(1).getId());
+    releaseList.add(conts.get(2).getId());
+    List<ContainerStatus> finishedConts =
+        am.allocate(new ArrayList<ResourceRequest>(), releaseList)
+            .getCompletedContainersStatuses();
+    while (finishedConts.size() < releaseList.size()) {
+      nm.nodeHeartbeat(true);
+      finishedConts
+          .addAll(am
+              .allocate(new ArrayList<ResourceRequest>(),
+                  new ArrayList<ContainerId>())
+              .getCompletedContainersStatuses());
+      Thread.sleep(100);
+    }
+
+    // Register for the second time
+    RegisterApplicationMasterResponse response = null;
+    try {
+      response = am.registerAppAttempt(false);
+      // When AM restart, it means nmToken in client side should be missing
+      tokenCacheClientSide.clear();
+      response.getNMTokensFromPreviousAttempts()
+          .forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
+    } catch (InvalidApplicationMasterRequestException e) {
+      Assert.assertEquals(false, keepContainers);
+      return;
+    }
+    Assert.assertEquals("RM should not allow second register"
+        + " for UAM without keep container flag ", true, keepContainers);
+
+    // Expecting the zero running containers previously
+    Assert.assertEquals(0, 
response.getContainersFromPreviousAttempts().size());
+    Assert.assertEquals(0, response.getNMTokensFromPreviousAttempts().size());
+
+    // Allocate one more containers to UAM, just to be safe
+    numContainers = 1;
+    am.allocate("127.0.0.1", 1000, numContainers, new 
ArrayList<ContainerId>());
+    nm.nodeHeartbeat(true);
+    allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+    allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+    conts = allocateResponse.getAllocatedContainers();
+    while (conts.size() < numContainers) {
+      nm.nodeHeartbeat(true);
+      allocateResponse =
+          am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+      allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+      conts.addAll(allocateResponse.getAllocatedContainers());
+      Thread.sleep(100);
+    }
+    checkNMTokenForContainer(tokenCacheClientSide, conts);
 
     rm.stop();
   }
@@ -164,4 +288,19 @@ public class TestWorkPreservingUnmanagedAM
     testUAMRestart(false);
   }
 
+  @Test(timeout = 600000)
+  public void testUAMRestartKeepContainersWithoutTransferContainer() throws 
Exception {
+    testUAMRestartWithoutTransferContainer(true);
+  }
+
+  @Test(timeout = 600000)
+  public void testUAMRestartNoKeepContainersWithoutTransferContainer() throws 
Exception {
+    testUAMRestartWithoutTransferContainer(false);
+  }
+
+  private void checkNMTokenForContainer(Set<NodeId> cacheToken, 
List<Container> containers) {
+    for (Container container : containers) {
+      Assert.assertTrue(cacheToken.contains(container.getNodeId()));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to