K0K0V0K commented on code in PR #5317:
URL: https://github.com/apache/hadoop/pull/5317#discussion_r1121779766


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java:
##########
@@ -125,576 +117,362 @@ public void postTransition(
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container
             .ContainerState afterState,
         ContainerEvent processedEvent) {
-      synchronized (TRANSITION_COUNTER) {
-        if (beforeState != afterState) {
-          ContainerId id = op.getContainerId();
-          TRANSITION_COUNTER
-              .putIfAbsent(id, new HashMap<>());
-          long sum = TRANSITION_COUNTER.get(id)
-              .compute(afterState,
-                  (state, count) -> count == null ? 1 : count + 1);
-          LOG.info("***** " + id +
-              " Transition from " + beforeState +
-              " to " + afterState +
-              "sum:" + sum);
-        }
+      if (beforeState != afterState &&
+        afterState == 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.RUNNING) {
+        RUNNING_TRANSITIONS.compute(op.getContainerId(),
+            (containerId, counter) -> counter == null ? 1 : ++counter);
       }
     }
-
-    /**
-     * Get the current number of state transitions.
-     * This is useful to check, if an event has occurred in unit tests.
-     * @param id Container id to check
-     * @param state Return the overall number of transitions to this state
-     * @return Number of transitions to the state specified
-     */
-    static long getTransitionCounter(ContainerId id,
-                                     org.apache.hadoop.yarn.server.nodemanager
-                                         .containermanager.container
-                                         .ContainerState state) {
-      Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>())
-          .get(state);
-      return ret != null ? ret : 0;
-    }
   }
 
-  @Before
-  public void setup() throws YarnException, IOException {
-    // start minicluster
+  public void setup() throws YarnException, IOException, InterruptedException, 
TimeoutException {
     conf = new YarnConfiguration();
-    // Turn on state tracking
     conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
         DebugSumContainerStateListener.class.getName());
-    yarnCluster =
-        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    startYarnCluster();
+    startYarnClient();
+    UserGroupInformation.setLoginUser(UserGroupInformation
+      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    nmTokenCache = new NMTokenCache();
+    startRMClient();
+    startNMClient();
+  }
+
+
+  private void startYarnCluster() {
+    yarnCluster = new MiniYARNCluster(TestNMClient.class.getName(), 3, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.start();
-    assertNotNull(yarnCluster);
     assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+  }
 
-    // start rm client
+  private void startYarnClient()
+      throws IOException, YarnException, InterruptedException, 
TimeoutException {
     yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
     yarnClient.init(conf);
     yarnClient.start();
-    assertNotNull(yarnClient);
     assertEquals(STATE.STARTED, yarnClient.getServiceState());
-
-    // get node info
     nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
-
-    // submit new app
-    ApplicationSubmissionContext appContext = 
+    ApplicationSubmissionContext appContext =
         yarnClient.createApplication().getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
-    // set the application name
     appContext.setApplicationName("Test");
-    // Set the priority for the application master
     Priority pri = Priority.newInstance(0);
     appContext.setPriority(pri);
-    // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue("default");
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
+    ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
     appContext.setAMContainerSpec(amContainer);
-    // unmanaged AM
     appContext.setUnmanagedAM(true);
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
+    SubmitApplicationRequest appRequest = 
Records.newRecord(SubmitApplicationRequest.class);
     appRequest.setApplicationSubmissionContext(appContext);
-    // Submit the application to the applications manager
     yarnClient.submitApplication(appContext);
+    GenericTestUtils.waitFor(() -> 
yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt().getAppAttemptState() == 
RMAppAttemptState.LAUNCHED,
+        100, 30_000, "Failed to start app");
+    appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt();
+  }
 
-    // wait for app to start
-    int iterationsLeft = 30;
-    RMAppAttempt appAttempt = null;
-    while (iterationsLeft > 0) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState() ==
-          YarnApplicationState.ACCEPTED) {
-        attemptId = appReport.getCurrentApplicationAttemptId();
-        appAttempt =
-            yarnCluster.getResourceManager().getRMContext().getRMApps()
-              .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-      sleep(1000);
-      --iterationsLeft;
-    }
-    if (iterationsLeft == 0) {
-      fail("Application hasn't bee started");
-    }
-
-    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
-    // of testing.
-    UserGroupInformation.setLoginUser(UserGroupInformation
-      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
-
-    //creating an instance NMTokenCase
-    nmTokenCache = new NMTokenCache();
-    
-    // start am rm client
-    rmClient =
-        (AMRMClientImpl<ContainerRequest>) AMRMClient
-          .<ContainerRequest> createAMRMClient();
-
-    //setting an instance NMTokenCase
+  private void startRMClient() {
+    rmClient = (AMRMClientImpl<ContainerRequest>) 
AMRMClient.createAMRMClient();
     rmClient.setNMTokenCache(nmTokenCache);
     rmClient.init(conf);
     rmClient.start();
-    assertNotNull(rmClient);
     assertEquals(STATE.STARTED, rmClient.getServiceState());
+  }
 
-    // start am nm client
+  private void startNMClient() {
     nmClient = (NMClientImpl) NMClient.createNMClient();
-    
-    //propagating the AMRMClient NMTokenCache instance
     nmClient.setNMTokenCache(rmClient.getNMTokenCache());
     nmClient.init(conf);
     nmClient.start();
-    assertNotNull(nmClient);
     assertEquals(STATE.STARTED, nmClient.getServiceState());
   }
 
-  @After
-  public void tearDown() {
+  public void tearDown() throws InterruptedException {
     rmClient.stop();
     yarnClient.stop();
-    yarnCluster.stop();
+    yarnCluster.asyncStop(this);
   }
 
-  private void stopNmClient(boolean stopContainers) {
-    assertNotNull("Null nmClient", nmClient);
-    // leave one unclosed
-    assertEquals(1, nmClient.startedContainers.size());
-    // default true
-    assertTrue(nmClient.getCleanupRunningContainers().get());
-    nmClient.cleanupRunningContainersOnStop(stopContainers);
-    assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
-    nmClient.stop();
-  }
-
-  @Test (timeout = 180000)
+  @Test (timeout = 180_000 * MAX_EARLY_FINISH)
   public void testNMClientNoCleanupOnStop()
-      throws YarnException, IOException {
-
-    rmClient.registerApplicationMaster("Host", 10000, "");
-
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
-
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // don't stop the running containers
-    stopNmClient(false);
-    assertFalse(nmClient.startedContainers.isEmpty());
-    //now cleanup
-    nmClient.cleanupRunningContainers();
-    assertEquals(0, nmClient.startedContainers.size());
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    int earlyFinishCounter = MAX_EARLY_FINISH;
+    while (0 < earlyFinishCounter) {
+      try {
+        setup();
+        rmClient.registerApplicationMaster("Host", 10_000, "");
+        testContainerManagement(nmClient, allocateContainers(rmClient));
+        rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
null, null);
+        stopNmClient();
+        assertFalse(nmClient.startedContainers.isEmpty());
+        nmClient.cleanupRunningContainers();
+        assertEquals(0, nmClient.startedContainers.size());
+        return;
+      } catch (EarlyFinishException e) {
+        --earlyFinishCounter;
+      } finally {
+        tearDown();
+      }
+    }
+    if (earlyFinishCounter == 0) {
+      fail("Too many early finish exception happened");
+    }
   }
 
-  @Test (timeout = 200000)
+  @Test (timeout = 200_000 * MAX_EARLY_FINISH)
   public void testNMClient()
-      throws YarnException, IOException {
-    rmClient.registerApplicationMaster("Host", 10000, "");
-
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
-    
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // stop the running containers on close
-    assertFalse(nmClient.startedContainers.isEmpty());
-    nmClient.cleanupRunningContainersOnStop(true);
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    int earlyFinishCounter = MAX_EARLY_FINISH;
+    while (0 < earlyFinishCounter) {
+      try {
+        setup();
+        rmClient.registerApplicationMaster("Host", 10_000, "");
+        testContainerManagement(nmClient, allocateContainers(rmClient));
+        rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
null, null);
+        // stop the running containers on close
+        assertFalse(nmClient.startedContainers.isEmpty());
+        nmClient.cleanupRunningContainersOnStop(true);
+        assertTrue(nmClient.getCleanupRunningContainers().get());
+        nmClient.stop();
+        return;
+      } catch (EarlyFinishException e) {
+        --earlyFinishCounter;
+      } finally {
+        tearDown();
+      }
+    }
+    if (earlyFinishCounter == 0) {
+      fail("Too many early finish exception happened");
+    }
+  }
+
+  private void stopNmClient() {
+    assertNotNull("Null nmClient", nmClient);
+    // leave one unclosed
+    assertEquals(1, nmClient.startedContainers.size());
+    // default true
     assertTrue(nmClient.getCleanupRunningContainers().get());
+    nmClient.cleanupRunningContainersOnStop(false);
+    assertFalse(nmClient.getCleanupRunningContainers().get());
     nmClient.stop();
   }
 
   private Set<Container> allocateContainers(
-      AMRMClientImpl<ContainerRequest> rmClient, int num)
-      throws YarnException, IOException {
-    // setup container request
-    Resource capability = Resource.newInstance(1024, 0);
-    Priority priority = Priority.newInstance(0);
-    String node = nodeReports.get(0).getNodeId().getHost();
-    String rack = nodeReports.get(0).getRackName();
-    String[] nodes = new String[] {node};
-    String[] racks = new String[] {rack};
-
-    for (int i = 0; i < num; ++i) {
-      rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
-          racks, priority));
+      AMRMClientImpl<ContainerRequest> rmClient
+  ) throws YarnException, IOException {
+    for (int i = 0; i < NUMBER_OF_CONTAINERS; ++i) {
+      rmClient.addContainerRequest(new ContainerRequest(
+          Resource.newInstance(256, 0),

Review Comment:
   I think a sleep job should not consume 1GB of memory, with my observations 
hadoop can solve this in 256MB memory (only!), so i reduced the number and 
cross fingers there is at least 256MB memory free during test runs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to