[ 
https://issues.apache.org/jira/browse/YARN-11306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607582#comment-17607582
 ] 

ASF GitHub Bot commented on YARN-11306:
---------------------------------------

slfan1989 commented on code in PR #4897:
URL: https://github.com/apache/hadoop/pull/4897#discussion_r976164888


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1242,4 +1255,181 @@ public void testRemoveAppFromRegistryApplicationFailed()
       return null;
     });
   }
+
+  public void testRecoverWithBadSubCluster(final RegistryOperations 
registryObj)
+      throws IOException, InterruptedException {
+
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+    // Prepare a list of subclusters
+    List<SubClusterId> subClusterIds = new ArrayList<>();
+    SubClusterId sc1 = SubClusterId.newInstance("SC-1");
+    SubClusterId sc2 = SubClusterId.newInstance("SC-2");
+    SubClusterId homeSC = SubClusterId.newInstance(HOME_SC_ID);
+    subClusterIds.add(sc1);
+    subClusterIds.add(sc2);
+    subClusterIds.add(homeSC);
+
+    // Prepare AMRMProxy Context
+    AMRMProxyApplicationContext appContext = new 
AMRMProxyApplicationContextImpl(nmContext,
+        getConf(), attemptId, "test-user", null, null, null, registryObj);
+
+    // Prepare RegisterApplicationMasterRequest
+    RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+      // Step1. Prepare subClusters SC-1, SC-2, HomeSC and Interceptor
+      initSubClusterAndInterceptor(subClusterIds, registryObj);
+
+      // Step2. Register Application And Assign Containers
+      List<Container> containers = 
registerApplicationAndAssignContainers(registerReq);
+
+      // Step3. Offline SC-1 cluster
+      offlineSubClusterSC1(sc1);
+
+      // Step4. Recover ApplicationMaster
+      recoverApplicationMaster(appContext);
+
+      // Step5. We recovered ApplicationMaster.
+      // SC-1 was offline, SC-2 was recovered at this time, 
UnmanagedAMPool.size=1 and only SC-2
+      UnmanagedAMPoolManager unmanagedAMPoolManager = 
interceptor.getUnmanagedAMPool();
+      Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+      Assert.assertNotNull(allUAMIds);
+      Assert.assertTrue(allUAMIds.size() == 1);
+      Assert.assertTrue(allUAMIds.contains(sc2.getId()));
+
+      // Step6. The first allocate call expects a fail-over exception and 
re-register.
+      AllocateRequest allocateRequest = 
Records.newRecord(AllocateRequest.class);
+      allocateRequest.setResponseId(0);
+      LambdaTestUtils.intercept(ApplicationMasterNotRegisteredException.class,
+          "AMRMProxy just restarted and recovered for " + this.attemptId +
+          ". AM should re-register and full re-send pending requests.",
+          () -> interceptor.allocate(allocateRequest));
+      interceptor.registerApplicationMaster(registerReq);
+
+      // Step7. release Containers
+      releaseContainers(containers, sc1);
+
+      // Step8. finish application
+      finishApplication();
+
+      return null;
+    });
+  }
+
+  private void initSubClusterAndInterceptor(List<SubClusterId> subClusterIds,
+      RegistryOperations registryObj) throws YarnException {
+    // Prepare subClusters SC-1, SC-2, HomeSC
+    for (SubClusterId subClusterId : subClusterIds) {
+      registerSubCluster(subClusterId);
+    }
+
+    // Prepare Interceptor
+    interceptor = new TestableFederationInterceptor();
+    AMRMProxyApplicationContext appContext = new 
AMRMProxyApplicationContextImpl(nmContext,
+        getConf(), attemptId, "test-user", null, null, null, registryObj);
+    interceptor.init(appContext);
+    interceptor.cleanupRegistry();
+  }
+
+  private List<Container> registerApplicationAndAssignContainers(
+      RegisterApplicationMasterRequest registerReq) throws Exception {
+
+    // Register HomeSC
+    RegisterApplicationMasterResponse registerResponse =
+        interceptor.registerApplicationMaster(registerReq);
+    Assert.assertNotNull(registerResponse);
+
+    // We only registered HomeSC, so UnmanagedAMPoolSize should be empty
+    Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+    // We assign 3 Containers to each cluster
+    int numberOfContainers = 3;
+    List<Container> containers =
+        getContainersAndAssert(numberOfContainers, numberOfContainers * 3);
+
+    // At this point, UnmanagedAMPoolSize should be equal to 2 and should 
contain SC-1, SC-2
+    Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+    UnmanagedAMPoolManager unmanagedAMPoolManager = 
interceptor.getUnmanagedAMPool();
+    Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+    Assert.assertNotNull(allUAMIds);
+    Assert.assertTrue(allUAMIds.size() == 2);
+    Assert.assertTrue(allUAMIds.contains("SC-1"));
+    Assert.assertTrue(allUAMIds.contains("SC-2"));
+
+    // Make sure all async hb threads are done
+    interceptor.drainAllAsyncQueue(true);
+
+    return containers;
+  }
+
+  private void offlineSubClusterSC1(SubClusterId subClusterId) throws 
YarnException {
+
+    ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+        interceptor.getSecondaryRMs();
+
+    // SC-1 out of service
+    deRegisterSubCluster(subClusterId);
+    secondaries.get(subClusterId.getId()).setRunningMode(false);
+  }
+
+  private void recoverApplicationMaster(AMRMProxyApplicationContext appContext)
+      throws IOException {
+    // Prepare for Federation Interceptor restart and recover
+    Map<String, byte[]> recoveredDataMap =
+        recoverDataMapForAppAttempt(nmStateStore, attemptId);
+
+    // Preserve the mock RM instances
+    MockResourceManagerFacade homeRM = interceptor.getHomeRM();
+
+    // Create a new interceptor instance and recover
+    interceptor = new TestableFederationInterceptor(homeRM,
+        interceptor.getSecondaryRMs());
+    interceptor.init(appContext);
+    interceptor.recover(recoveredDataMap);
+  }
+
+  private void releaseContainers(List<Container> containers, SubClusterId 
subClusterId)
+      throws Exception {
+
+    ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+        interceptor.getSecondaryRMs();
+    lastResponseId = 0;
+
+    // Get the Container list of SC-1
+    MockResourceManagerFacade sc1Facade = secondaries.get("SC-1");
+    HashMap<ApplicationId, List<ContainerId>> appContainerMap =
+            sc1Facade.getApplicationContainerIdMap();
+    Assert.assertNotNull(appContainerMap);
+    ApplicationId applicationId = attemptId.getApplicationId();
+    Assert.assertNotNull(applicationId);
+    List<ContainerId> sc1ContainerList = appContainerMap.get(applicationId);
+
+    // Release all containers,
+    // Because SC-1 is offline, it is necessary to clean up the Containers 
allocated by SC-1
+    containers = containers.stream()
+        .filter(container -> !sc1ContainerList.contains(container.getId()))
+        .collect(Collectors.toList());
+    releaseContainersAndAssert(containers);
+  }
+
+  private void finishApplication() throws IOException, YarnException {
+    // Finish the application
+    FinishApplicationMasterRequest finishReq =
+        Records.newRecord(FinishApplicationMasterRequest.class);
+    finishReq.setDiagnostics("");
+    finishReq.setTrackingUrl("");
+    finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+    FinishApplicationMasterResponse finishResponse =
+        interceptor.finishApplicationMaster(finishReq);
+    Assert.assertNotNull(finishResponse);
+    Assert.assertEquals(true, finishResponse.getIsUnregistered());

Review Comment:
   I will fix it.





> Refactor NM#FederationInterceptor#recover Code
> ----------------------------------------------
>
>                 Key: YARN-11306
>                 URL: https://issues.apache.org/jira/browse/YARN-11306
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: federation, nodemanager
>    Affects Versions: 3.4.0
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>
> Refactor NM#FederationInterceptor#recover Code
> 1.Enhance code readability
> 2.Add empty check
> 3.When an exception is encountered, completely destroy the data of SubCluster



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to