[ https://issues.apache.org/jira/browse/YARN-11306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607582#comment-17607582 ]
ASF GitHub Bot commented on YARN-11306: --------------------------------------- slfan1989 commented on code in PR #4897: URL: https://github.com/apache/hadoop/pull/4897#discussion_r976164888 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java: ########## @@ -1242,4 +1255,181 @@ public void testRemoveAppFromRegistryApplicationFailed() return null; }); } + + public void testRecoverWithBadSubCluster(final RegistryOperations registryObj) + throws IOException, InterruptedException { + + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + + // Prepare a list of subclusters + List<SubClusterId> subClusterIds = new ArrayList<>(); + SubClusterId sc1 = SubClusterId.newInstance("SC-1"); + SubClusterId sc2 = SubClusterId.newInstance("SC-2"); + SubClusterId homeSC = SubClusterId.newInstance(HOME_SC_ID); + subClusterIds.add(sc1); + subClusterIds.add(sc2); + subClusterIds.add(homeSC); + + // Prepare AMRMProxy Context + AMRMProxyApplicationContext appContext = new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registryObj); + + // Prepare RegisterApplicationMasterRequest + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + ugi.doAs((PrivilegedExceptionAction<Object>) () -> { + + // Step1. Prepare subClusters SC-1, SC-2, HomeSC and Interceptor + initSubClusterAndInterceptor(subClusterIds, registryObj); + + // Step2. Register Application And Assign Containers + List<Container> containers = registerApplicationAndAssignContainers(registerReq); + + // Step3. Offline SC-1 cluster + offlineSubClusterSC1(sc1); + + // Step4. Recover ApplicationMaster + recoverApplicationMaster(appContext); + + // Step5. We recovered ApplicationMaster. + // SC-1 was offline, SC-2 was recovered at this time, UnmanagedAMPool.size=1 and only SC-2 + UnmanagedAMPoolManager unmanagedAMPoolManager = interceptor.getUnmanagedAMPool(); + Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds(); + Assert.assertNotNull(allUAMIds); + Assert.assertTrue(allUAMIds.size() == 1); + Assert.assertTrue(allUAMIds.contains(sc2.getId())); + + // Step6. The first allocate call expects a fail-over exception and re-register. + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(0); + LambdaTestUtils.intercept(ApplicationMasterNotRegisteredException.class, + "AMRMProxy just restarted and recovered for " + this.attemptId + + ". AM should re-register and full re-send pending requests.", + () -> interceptor.allocate(allocateRequest)); + interceptor.registerApplicationMaster(registerReq); + + // Step7. release Containers + releaseContainers(containers, sc1); + + // Step8. finish application + finishApplication(); + + return null; + }); + } + + private void initSubClusterAndInterceptor(List<SubClusterId> subClusterIds, + RegistryOperations registryObj) throws YarnException { + // Prepare subClusters SC-1, SC-2, HomeSC + for (SubClusterId subClusterId : subClusterIds) { + registerSubCluster(subClusterId); + } + + // Prepare Interceptor + interceptor = new TestableFederationInterceptor(); + AMRMProxyApplicationContext appContext = new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registryObj); + interceptor.init(appContext); + interceptor.cleanupRegistry(); + } + + private List<Container> registerApplicationAndAssignContainers( + RegisterApplicationMasterRequest registerReq) throws Exception { + + // Register HomeSC + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + // We only registered HomeSC, so UnmanagedAMPoolSize should be empty + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // We assign 3 Containers to each cluster + int numberOfContainers = 3; + List<Container> containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 3); + + // At this point, UnmanagedAMPoolSize should be equal to 2 and should contain SC-1, SC-2 + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); + UnmanagedAMPoolManager unmanagedAMPoolManager = interceptor.getUnmanagedAMPool(); + Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds(); + Assert.assertNotNull(allUAMIds); + Assert.assertTrue(allUAMIds.size() == 2); + Assert.assertTrue(allUAMIds.contains("SC-1")); + Assert.assertTrue(allUAMIds.contains("SC-2")); + + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); + + return containers; + } + + private void offlineSubClusterSC1(SubClusterId subClusterId) throws YarnException { + + ConcurrentHashMap<String, MockResourceManagerFacade> secondaries = + interceptor.getSecondaryRMs(); + + // SC-1 out of service + deRegisterSubCluster(subClusterId); + secondaries.get(subClusterId.getId()).setRunningMode(false); + } + + private void recoverApplicationMaster(AMRMProxyApplicationContext appContext) + throws IOException { + // Prepare for Federation Interceptor restart and recover + Map<String, byte[]> recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + + // Create a new interceptor instance and recover + interceptor = new TestableFederationInterceptor(homeRM, + interceptor.getSecondaryRMs()); + interceptor.init(appContext); + interceptor.recover(recoveredDataMap); + } + + private void releaseContainers(List<Container> containers, SubClusterId subClusterId) + throws Exception { + + ConcurrentHashMap<String, MockResourceManagerFacade> secondaries = + interceptor.getSecondaryRMs(); + lastResponseId = 0; + + // Get the Container list of SC-1 + MockResourceManagerFacade sc1Facade = secondaries.get("SC-1"); + HashMap<ApplicationId, List<ContainerId>> appContainerMap = + sc1Facade.getApplicationContainerIdMap(); + Assert.assertNotNull(appContainerMap); + ApplicationId applicationId = attemptId.getApplicationId(); + Assert.assertNotNull(applicationId); + List<ContainerId> sc1ContainerList = appContainerMap.get(applicationId); + + // Release all containers, + // Because SC-1 is offline, it is necessary to clean up the Containers allocated by SC-1 + containers = containers.stream() + .filter(container -> !sc1ContainerList.contains(container.getId())) + .collect(Collectors.toList()); + releaseContainersAndAssert(containers); + } + + private void finishApplication() throws IOException, YarnException { + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertEquals(true, finishResponse.getIsUnregistered()); Review Comment: I will fix it. > Refactor NM#FederationInterceptor#recover Code > ---------------------------------------------- > > Key: YARN-11306 > URL: https://issues.apache.org/jira/browse/YARN-11306 > Project: Hadoop YARN > Issue Type: Improvement > Components: federation, nodemanager > Affects Versions: 3.4.0 > Reporter: fanshilun > Assignee: fanshilun > Priority: Major > Labels: pull-request-available > > Refactor NM#FederationInterceptor#recover Code > 1.Enhance code readability > 2.Add empty check > 3.When an exception is encountered, completely destroy the data of SubCluster -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org