This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new c5e9510b549 YARN-8980. Mapreduce application container start fail after AM restart. (#5975) Contributed by Chenyu Zheng. c5e9510b549 is described below commit c5e9510b54932946f792cfc88acf8b022b5d2fe8 Author: zhengchenyu <zhengcheny...@163.com> AuthorDate: Sat Sep 9 09:50:53 2023 +0800 YARN-8980. Mapreduce application container start fail after AM restart. (#5975) Contributed by Chenyu Zheng. Reviewed-by: Shilun Fan <slfan1...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../amrmproxy/FederationInterceptor.java | 26 ++++ .../resourcemanager/DefaultAMSProcessor.java | 7 +- .../TestWorkPreservingUnmanagedAM.java | 155 +++++++++++++++++++-- 3 files changed, 176 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index e7fcb1c3785..9c4c2c72e5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -98,6 +98,7 @@ import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.resource.Resources; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -260,6 +261,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private final MonotonicClock clock = new MonotonicClock(); + /* + * For UAM, keepContainersAcrossApplicationAttempts is always true. + * When re-register to RM, RM will clear node set and regenerate NMToken for transferred + * container. But If keepContainersAcrossApplicationAttempts of AM is false, AM may not + * called getNMTokensFromPreviousAttempts, so the NMToken which is pass from + * RegisterApplicationMasterResponse will be missing. Here we cache these NMToken, + * then pass to AM in allocate stage. + * */ + private Set<NMToken> nmTokenMapFromRegisterSecondaryCluster; + /** * Creates an instance of the FederationInterceptor class. */ @@ -278,6 +289,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.finishAMCalled = false; this.lastSCResponseTime = new ConcurrentHashMap<>(); this.lastAMHeartbeatTime = this.clock.getTime(); + this.nmTokenMapFromRegisterSecondaryCluster = new ConcurrentHashSet<>(); } /** @@ -453,6 +465,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // RegisterApplicationMaster RegisterApplicationMasterResponse response = this.uamPool.registerApplicationMaster(keyScId, this.amRegistrationRequest); + nmTokenMapFromRegisterSecondaryCluster.addAll(response.getNMTokensFromPreviousAttempts()); // Set sub-cluster to be timed out initially lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut); @@ -1096,6 +1109,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (registerResponse != null) { LOG.info("Merging register response for {}", appId); mergeRegisterResponse(homeResponse, registerResponse); + nmTokenMapFromRegisterSecondaryCluster.addAll( + registerResponse.getNMTokensFromPreviousAttempts()); } } catch (Exception e) { LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e); @@ -1434,6 +1449,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } } + // When re-register RM, client may not cache the NMToken from register response. + // Here we pass these NMToken in allocate stage. + if (nmTokenMapFromRegisterSecondaryCluster.size() > 0) { + List<NMToken> duplicateNmToken = new ArrayList(nmTokenMapFromRegisterSecondaryCluster); + nmTokenMapFromRegisterSecondaryCluster.removeAll(duplicateNmToken); + if (!isNullOrEmpty(mergedResponse.getNMTokens())) { + mergedResponse.getNMTokens().addAll(duplicateNmToken); + } else { + mergedResponse.setNMTokens(duplicateNmToken); + } + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 98fb9b34942..96e65855fee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -168,14 +168,13 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { // and corresponding NM tokens. if (app.getApplicationSubmissionContext() .getKeepContainersAcrossApplicationAttempts()) { + // Clear the node set remembered by the secret manager. Necessary + // for UAM restart because we use the same attemptId. + rmContext.getNMTokenSecretManager().clearNodeSetForAttempt(applicationAttemptId); List<Container> transferredContainers = getScheduler() .getTransferredContainers(applicationAttemptId); if (!transferredContainers.isEmpty()) { response.setContainersFromPreviousAttempts(transferredContainers); - // Clear the node set remembered by the secret manager. Necessary - // for UAM restart because we use the same attemptId. - rmContext.getNMTokenSecretManager() - .clearNodeSetForAttempt(applicationAttemptId); List<NMToken> nmTokens = new ArrayList<NMToken>(); for (Container container : transferredContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java index 3ed5cfd4f7b..3c57dfc19e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java @@ -21,15 +21,19 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; @@ -71,6 +75,7 @@ public class TestWorkPreservingUnmanagedAM MockNM nm = new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm.registerNode(); + Set<NodeId> tokenCacheClientSide = new HashSet(); // create app and launch the UAM boolean unamanged = true; @@ -98,14 +103,19 @@ public class TestWorkPreservingUnmanagedAM // Allocate two containers to UAM int numContainers = 3; - List<Container> conts = am.allocate("127.0.0.1", 1000, numContainers, - new ArrayList<ContainerId>()).getAllocatedContainers(); + AllocateResponse allocateResponse = + am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>()); + allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + List<Container> conts = allocateResponse.getAllocatedContainers(); while (conts.size() < numContainers) { nm.nodeHeartbeat(true); - conts.addAll(am.allocate(new ArrayList<ResourceRequest>(), - new ArrayList<ContainerId>()).getAllocatedContainers()); + allocateResponse = + am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + conts.addAll(allocateResponse.getAllocatedContainers()); Thread.sleep(100); } + checkNMTokenForContainer(tokenCacheClientSide, conts); // Release one container List<ContainerId> releaseList = @@ -127,6 +137,10 @@ public class TestWorkPreservingUnmanagedAM RegisterApplicationMasterResponse response = null; try { response = am.registerAppAttempt(false); + // When AM restart, it means nmToken in client side should be missing + tokenCacheClientSide.clear(); + response.getNMTokensFromPreviousAttempts() + .forEach(token -> tokenCacheClientSide.add(token.getNodeId())); } catch (InvalidApplicationMasterRequestException e) { Assert.assertEquals(false, keepContainers); return; @@ -142,14 +156,124 @@ public class TestWorkPreservingUnmanagedAM numContainers = 1; am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>()); nm.nodeHeartbeat(true); - conts = am.allocate(new ArrayList<ResourceRequest>(), - new ArrayList<ContainerId>()).getAllocatedContainers(); + allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + conts = allocateResponse.getAllocatedContainers(); while (conts.size() < numContainers) { nm.nodeHeartbeat(true); - conts.addAll(am.allocate(new ArrayList<ResourceRequest>(), - new ArrayList<ContainerId>()).getAllocatedContainers()); + allocateResponse = + am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + conts.addAll(allocateResponse.getAllocatedContainers()); Thread.sleep(100); } + checkNMTokenForContainer(tokenCacheClientSide, conts); + + rm.stop(); + } + + protected void testUAMRestartWithoutTransferContainer(boolean keepContainers) throws Exception { + // start RM + MockRM rm = new MockRM(); + rm.start(); + MockNM nm = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm.registerNode(); + Set<NodeId> tokenCacheClientSide = new HashSet(); + + // create app and launch the UAM + boolean unamanged = true; + int maxAttempts = 1; + boolean waitForAccepted = true; + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(200, rm) + .withAppName("") + .withUser(UserGroupInformation.getCurrentUser().getShortUserName()) + .withAcls(null) + .withUnmanagedAM(unamanged) + .withQueue(null) + .withMaxAppAttempts(maxAttempts) + .withCredentials(null) + .withAppType(null) + .withWaitForAppAcceptedState(waitForAccepted) + .withKeepContainers(keepContainers) + .build(); + RMApp app = MockRMAppSubmitter.submit(rm, data); + + MockAM am = MockRM.launchUAM(app, rm, nm); + + // Register for the first time + am.registerAppAttempt(); + + // Allocate two containers to UAM + int numContainers = 3; + AllocateResponse allocateResponse = + am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>()); + allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + List<Container> conts = allocateResponse.getAllocatedContainers(); + while (conts.size() < numContainers) { + nm.nodeHeartbeat(true); + allocateResponse = + am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + conts.addAll(allocateResponse.getAllocatedContainers()); + Thread.sleep(100); + } + checkNMTokenForContainer(tokenCacheClientSide, conts); + + // Release all containers, then there are no transfer containfer app attempt + List<ContainerId> releaseList = new ArrayList(); + releaseList.add(conts.get(0).getId()); + releaseList.add(conts.get(1).getId()); + releaseList.add(conts.get(2).getId()); + List<ContainerStatus> finishedConts = + am.allocate(new ArrayList<ResourceRequest>(), releaseList) + .getCompletedContainersStatuses(); + while (finishedConts.size() < releaseList.size()) { + nm.nodeHeartbeat(true); + finishedConts + .addAll(am + .allocate(new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>()) + .getCompletedContainersStatuses()); + Thread.sleep(100); + } + + // Register for the second time + RegisterApplicationMasterResponse response = null; + try { + response = am.registerAppAttempt(false); + // When AM restart, it means nmToken in client side should be missing + tokenCacheClientSide.clear(); + response.getNMTokensFromPreviousAttempts() + .forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + } catch (InvalidApplicationMasterRequestException e) { + Assert.assertEquals(false, keepContainers); + return; + } + Assert.assertEquals("RM should not allow second register" + + " for UAM without keep container flag ", true, keepContainers); + + // Expecting the zero running containers previously + Assert.assertEquals(0, response.getContainersFromPreviousAttempts().size()); + Assert.assertEquals(0, response.getNMTokensFromPreviousAttempts().size()); + + // Allocate one more containers to UAM, just to be safe + numContainers = 1; + am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>()); + nm.nodeHeartbeat(true); + allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + conts = allocateResponse.getAllocatedContainers(); + while (conts.size() < numContainers) { + nm.nodeHeartbeat(true); + allocateResponse = + am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); + allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId())); + conts.addAll(allocateResponse.getAllocatedContainers()); + Thread.sleep(100); + } + checkNMTokenForContainer(tokenCacheClientSide, conts); rm.stop(); } @@ -164,4 +288,19 @@ public class TestWorkPreservingUnmanagedAM testUAMRestart(false); } + @Test(timeout = 600000) + public void testUAMRestartKeepContainersWithoutTransferContainer() throws Exception { + testUAMRestartWithoutTransferContainer(true); + } + + @Test(timeout = 600000) + public void testUAMRestartNoKeepContainersWithoutTransferContainer() throws Exception { + testUAMRestartWithoutTransferContainer(false); + } + + private void checkNMTokenForContainer(Set<NodeId> cacheToken, List<Container> containers) { + for (Container container : containers) { + Assert.assertTrue(cacheToken.contains(container.getNodeId())); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org