This is an automated email from the ASF dual-hosted git repository.

bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d1f8894328b YARN-11420. Stabilize TestNMClient (#5317)
5d1f8894328b is described below

commit 5d1f8894328b5536255660a9bcf6a207d125f5c1
Author: K0K0V0K <109747532+k0k0...@users.noreply.github.com>
AuthorDate: Tue Dec 5 11:32:42 2023 +0100

    YARN-11420. Stabilize TestNMClient (#5317)
---
 .../hadoop/yarn/client/api/impl/TestNMClient.java  | 687 +++++++--------------
 1 file changed, 219 insertions(+), 468 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index eb2ecb96cf20..ce24d75d629f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -23,28 +23,23 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.NMClient;
@@ -60,53 +55,48 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
+
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.function.ThrowingRunnable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class TestNMClient {
-  Configuration conf = null;
-  MiniYARNCluster yarnCluster = null;
-  YarnClientImpl yarnClient = null;
-  AMRMClientImpl<ContainerRequest> rmClient = null;
-  NMClientImpl nmClient = null;
-  List<NodeReport> nodeReports = null;
-  ApplicationAttemptId attemptId = null;
-  int nodeCount = 3;
-  NMTokenCache nmTokenCache = null;
+  private static final String IS_NOT_HANDLED_BY_THIS_NODEMANAGER =
+      "is not handled by this NodeManager";
+  private static final String UNKNOWN_CONTAINER =
+      "Unknown container";
+
+  private static final int NUMBER_OF_CONTAINERS = 5;
+  private Configuration conf;
+  private MiniYARNCluster yarnCluster;
+  private YarnClientImpl yarnClient;
+  private AMRMClientImpl<ContainerRequest> rmClient;
+  private NMClientImpl nmClient;
+  private List<NodeReport> nodeReports;
+  private NMTokenCache nmTokenCache;
+  private RMAppAttempt appAttempt;
 
   /**
    * Container State transition listener to track the number of times
    * a container has transitioned into a state.
    */
-  public static class DebugSumContainerStateListener
-      implements ContainerStateTransitionListener {
-
-    private static final Logger LOG =
-        LoggerFactory.getLogger(DebugSumContainerStateListener.class);
-    private static final Map<ContainerId,
-        Map<org.apache.hadoop.yarn.server.nodemanager.containermanager
-            .container.ContainerState, Long>>
-        TRANSITION_COUNTER = new HashMap<>();
+  public static class DebugSumContainerStateListener implements 
ContainerStateTransitionListener {
+    public static final Map<ContainerId, Integer> RUNNING_TRANSITIONS = new 
ConcurrentHashMap<>();
 
     public void init(Context context) {
     }
@@ -125,576 +115,337 @@ public class TestNMClient {
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container
             .ContainerState afterState,
         ContainerEvent processedEvent) {
-      synchronized (TRANSITION_COUNTER) {
-        if (beforeState != afterState) {
-          ContainerId id = op.getContainerId();
-          TRANSITION_COUNTER
-              .putIfAbsent(id, new HashMap<>());
-          long sum = TRANSITION_COUNTER.get(id)
-              .compute(afterState,
-                  (state, count) -> count == null ? 1 : count + 1);
-          LOG.info("***** " + id +
-              " Transition from " + beforeState +
-              " to " + afterState +
-              "sum:" + sum);
-        }
+      if (beforeState != afterState &&
+          afterState == 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.RUNNING) {
+        RUNNING_TRANSITIONS.compute(op.getContainerId(),
+            (containerId, counter) -> counter == null ? 1 : ++counter);
       }
     }
-
-    /**
-     * Get the current number of state transitions.
-     * This is useful to check, if an event has occurred in unit tests.
-     * @param id Container id to check
-     * @param state Return the overall number of transitions to this state
-     * @return Number of transitions to the state specified
-     */
-    static long getTransitionCounter(ContainerId id,
-                                     org.apache.hadoop.yarn.server.nodemanager
-                                         .containermanager.container
-                                         .ContainerState state) {
-      Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>())
-          .get(state);
-      return ret != null ? ret : 0;
-    }
   }
 
-  @Before
-  public void setup() throws YarnException, IOException {
-    // start minicluster
+  public void setup() throws YarnException, IOException, InterruptedException, 
TimeoutException {
     conf = new YarnConfiguration();
-    // Turn on state tracking
     conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
         DebugSumContainerStateListener.class.getName());
-    yarnCluster =
-        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    startYarnCluster();
+    startYarnClient();
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    nmTokenCache = new NMTokenCache();
+    startRMClient();
+    startNMClient();
+  }
+
+
+  private void startYarnCluster() {
+    yarnCluster = new MiniYARNCluster(TestNMClient.class.getName(), 3, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.start();
-    assertNotNull(yarnCluster);
     assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+  }
 
-    // start rm client
+  private void startYarnClient()
+      throws IOException, YarnException, InterruptedException, 
TimeoutException {
     yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
     yarnClient.init(conf);
     yarnClient.start();
-    assertNotNull(yarnClient);
     assertEquals(STATE.STARTED, yarnClient.getServiceState());
-
-    // get node info
     nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
 
-    // submit new app
-    ApplicationSubmissionContext appContext = 
+    ApplicationSubmissionContext appContext =
         yarnClient.createApplication().getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
-    // set the application name
     appContext.setApplicationName("Test");
-    // Set the priority for the application master
     Priority pri = Priority.newInstance(0);
     appContext.setPriority(pri);
-    // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue("default");
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
+    ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
     appContext.setAMContainerSpec(amContainer);
-    // unmanaged AM
     appContext.setUnmanagedAM(true);
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
+
+    SubmitApplicationRequest appRequest = 
Records.newRecord(SubmitApplicationRequest.class);
     appRequest.setApplicationSubmissionContext(appContext);
-    // Submit the application to the applications manager
     yarnClient.submitApplication(appContext);
+    GenericTestUtils.waitFor(() -> 
yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt().getAppAttemptState() == 
RMAppAttemptState.LAUNCHED,
+        100, 30_000, "Failed to start app");
+    appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+        .get(appId).getCurrentAppAttempt();
+  }
 
-    // wait for app to start
-    int iterationsLeft = 30;
-    RMAppAttempt appAttempt = null;
-    while (iterationsLeft > 0) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState() ==
-          YarnApplicationState.ACCEPTED) {
-        attemptId = appReport.getCurrentApplicationAttemptId();
-        appAttempt =
-            yarnCluster.getResourceManager().getRMContext().getRMApps()
-              .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-      sleep(1000);
-      --iterationsLeft;
-    }
-    if (iterationsLeft == 0) {
-      fail("Application hasn't bee started");
-    }
-
-    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
-    // of testing.
-    UserGroupInformation.setLoginUser(UserGroupInformation
-      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
-
-    //creating an instance NMTokenCase
-    nmTokenCache = new NMTokenCache();
-    
-    // start am rm client
-    rmClient =
-        (AMRMClientImpl<ContainerRequest>) AMRMClient
-          .<ContainerRequest> createAMRMClient();
-
-    //setting an instance NMTokenCase
+  private void startRMClient() {
+    rmClient = (AMRMClientImpl<ContainerRequest>) 
AMRMClient.createAMRMClient();
     rmClient.setNMTokenCache(nmTokenCache);
     rmClient.init(conf);
     rmClient.start();
-    assertNotNull(rmClient);
     assertEquals(STATE.STARTED, rmClient.getServiceState());
+  }
 
-    // start am nm client
+  private void startNMClient() {
     nmClient = (NMClientImpl) NMClient.createNMClient();
-    
-    //propagating the AMRMClient NMTokenCache instance
     nmClient.setNMTokenCache(rmClient.getNMTokenCache());
     nmClient.init(conf);
     nmClient.start();
-    assertNotNull(nmClient);
     assertEquals(STATE.STARTED, nmClient.getServiceState());
   }
 
-  @After
-  public void tearDown() {
+  public void tearDown() throws InterruptedException {
     rmClient.stop();
     yarnClient.stop();
     yarnCluster.stop();
   }
 
-  private void stopNmClient(boolean stopContainers) {
-    assertNotNull("Null nmClient", nmClient);
-    // leave one unclosed
-    assertEquals(1, nmClient.startedContainers.size());
-    // default true
-    assertTrue(nmClient.getCleanupRunningContainers().get());
-    nmClient.cleanupRunningContainersOnStop(stopContainers);
-    assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
-    nmClient.stop();
-  }
-
-  @Test (timeout = 180000)
+  @Test (timeout = 180_000)
   public void testNMClientNoCleanupOnStop()
-      throws YarnException, IOException {
-
-    rmClient.registerApplicationMaster("Host", 10000, "");
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      stopNmClient();
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainers();
+      assertEquals(0, nmClient.startedContainers.size());
+    });
+  }
 
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+  @Test (timeout = 200_000)
+  public void testNMClient()
+      throws YarnException, IOException, InterruptedException, 
TimeoutException {
+    runTest(() -> {
+      // stop the running containers on close
+      assertFalse(nmClient.startedContainers.isEmpty());
+      nmClient.cleanupRunningContainersOnStop(true);
+      assertTrue(nmClient.getCleanupRunningContainers().get());
+      nmClient.stop();
+    });
+  }
 
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // don't stop the running containers
-    stopNmClient(false);
-    assertFalse(nmClient.startedContainers.isEmpty());
-    //now cleanup
-    nmClient.cleanupRunningContainers();
-    assertEquals(0, nmClient.startedContainers.size());
+  public void runTest(
+      Runnable test
+  ) throws IOException, InterruptedException, YarnException, TimeoutException {
+    setup();
+    rmClient.registerApplicationMaster("Host", 10_000, "");
+    testContainerManagement(nmClient, allocateContainers(rmClient));
+    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
null, null);
+    test.run();
+    tearDown();
   }
 
-  @Test (timeout = 200000)
-  public void testNMClient()
-      throws YarnException, IOException {
-    rmClient.registerApplicationMaster("Host", 10000, "");
-
-    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
-    
-    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-        null, null);
-    // stop the running containers on close
-    assertFalse(nmClient.startedContainers.isEmpty());
-    nmClient.cleanupRunningContainersOnStop(true);
+  private void stopNmClient() {
+    assertNotNull("Null nmClient", nmClient);
+    // leave one unclosed
+    assertEquals(1, nmClient.startedContainers.size());
+    // default true
     assertTrue(nmClient.getCleanupRunningContainers().get());
+    nmClient.cleanupRunningContainersOnStop(false);
+    assertFalse(nmClient.getCleanupRunningContainers().get());
     nmClient.stop();
   }
 
   private Set<Container> allocateContainers(
-      AMRMClientImpl<ContainerRequest> rmClient, int num)
-      throws YarnException, IOException {
-    // setup container request
-    Resource capability = Resource.newInstance(1024, 0);
-    Priority priority = Priority.newInstance(0);
-    String node = nodeReports.get(0).getNodeId().getHost();
-    String rack = nodeReports.get(0).getRackName();
-    String[] nodes = new String[] {node};
-    String[] racks = new String[] {rack};
-
-    for (int i = 0; i < num; ++i) {
-      rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
-          racks, priority));
+      AMRMClientImpl<ContainerRequest> client
+  ) throws YarnException, IOException {
+    for (int i = 0; i < NUMBER_OF_CONTAINERS; ++i) {
+      client.addContainerRequest(new ContainerRequest(
+          Resource.newInstance(1024, 0),
+          new String[] {nodeReports.get(0).getNodeId().getHost()},
+          new String[] {nodeReports.get(0).getRackName()},
+          Priority.newInstance(0)
+      ));
     }
-
-    int containersRequestedAny = rmClient.getTable(0)
-        .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
-            capability).remoteRequest.getNumContainers();
-
-    // RM should allocate container within 2 calls to allocate()
-    int allocatedContainerCount = 0;
-    int iterationsLeft = 2;
-    Set<Container> containers = new TreeSet<Container>();
-    while (allocatedContainerCount < containersRequestedAny
-        && iterationsLeft > 0) {
-      AllocateResponse allocResponse = rmClient.allocate(0.1f);
-
-      allocatedContainerCount += allocResponse.getAllocatedContainers().size();
-      for(Container container : allocResponse.getAllocatedContainers()) {
-        containers.add(container);
+    Set<Container> allocatedContainers = new TreeSet<>();
+    while (allocatedContainers.size() < NUMBER_OF_CONTAINERS) {
+      AllocateResponse allocResponse = client.allocate(0.1f);
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+      for (NMToken token : allocResponse.getNMTokens()) {
+        client.getNMTokenCache().setToken(token.getNodeId().toString(), 
token.getToken());
       }
-      if (!allocResponse.getNMTokens().isEmpty()) {
-        for (NMToken token : allocResponse.getNMTokens()) {
-          rmClient.getNMTokenCache().setToken(token.getNodeId().toString(),
-              token.getToken());
-        }
+      if (allocatedContainers.size() < NUMBER_OF_CONTAINERS) {
+        sleep(100);
       }
-      if(allocatedContainerCount < containersRequestedAny) {
-        // sleep to let NM's heartbeat to RM and trigger allocations
-        sleep(1000);
-      }
-
-      --iterationsLeft;
     }
-    return containers;
+    return allocatedContainers;
   }
 
-  private void testContainerManagement(NMClientImpl client,
-      Set<Container> containers) throws YarnException, IOException {
+  private void testContainerManagement(
+      NMClientImpl client, Set<Container> containers
+  ) throws YarnException, IOException {
     int size = containers.size();
     int i = 0;
     for (Container container : containers) {
       // getContainerStatus shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.getContainerStatus(container.getId(), container.getNodeId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("is not handled by this NodeManager"));
-      }
+      assertYarnException(
+          () -> client.getContainerStatus(container.getId(), 
container.getNodeId()),
+          IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
       // upadateContainerResource shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.updateContainerResource(container);
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("is not handled by this NodeManager"));
-      }
-
+      assertYarnException(
+          () -> client.updateContainerResource(container),
+          IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
       // restart shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.restartContainer(container.getId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("Unknown container"));
-      }
-
+      assertYarnException(
+          () -> client.restartContainer(container.getId()),
+          UNKNOWN_CONTAINER);
       // rollback shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.rollbackLastReInitialization(container.getId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("Unknown container"));
-      }
-
+      assertYarnException(
+          () -> client.rollbackLastReInitialization(container.getId()),
+          UNKNOWN_CONTAINER);
       // commit shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
-      try {
-        client.commitLastReInitialization(container.getId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        assertTrue("The thrown exception is not expected",
-            e.getMessage().contains("Unknown container"));
-      }
-
+      assertYarnException(
+          () -> client.commitLastReInitialization(container.getId()),
+          UNKNOWN_CONTAINER);
       // stopContainer shouldn't be called before startContainer,
       // otherwise, an exception will be thrown
-      try {
-        client.stopContainer(container.getId(), container.getNodeId());
-        fail("Exception is expected");
-      } catch (YarnException e) {
-        if (!e.getMessage()
-              .contains("is not handled by this NodeManager")) {
-          throw new AssertionError("Exception is not expected: ", e);
-        }
-      }
+      assertYarnException(
+          () -> client.stopContainer(container.getId(), container.getNodeId()),
+          IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
 
       Credentials ts = new Credentials();
       DataOutputBuffer dob = new DataOutputBuffer();
       ts.writeTokenStorageToStream(dob);
-      ByteBuffer securityTokens =
-          ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-      ContainerLaunchContext clc =
-          Records.newRecord(ContainerLaunchContext.class);
-      if (Shell.WINDOWS) {
-        clc.setCommands(
-            Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul"));
-      } else {
-        clc.setCommands(Arrays.asList("sleep", "1000000"));
-      }
+      ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, 
dob.getLength());
+      ContainerLaunchContext clc = 
Records.newRecord(ContainerLaunchContext.class);
+      clc.setCommands(Shell.WINDOWS
+          ? Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul")
+          : Arrays.asList("sleep", "1000000")
+      );
       clc.setTokens(securityTokens);
-      try {
-        client.startContainer(container, clc);
-      } catch (YarnException e) {
-        throw new AssertionError("Exception is not expected ", e);
-      }
-      List<Integer> exitStatuses = Collections.singletonList(-1000);
+      client.startContainer(container, clc);
+      List<Integer> exitStatuses = Arrays.asList(-1000, -105);
 
       // leave one container unclosed
       if (++i < size) {
         testContainer(client, i, container, clc, exitStatuses);
-
       }
     }
   }
 
   private void testContainer(NMClientImpl client, int i, Container container,
                              ContainerLaunchContext clc, List<Integer> 
exitCode)
-      throws YarnException, IOException {
-    // NodeManager may still need some time to make the container started
+          throws YarnException, IOException {
     testGetContainerStatus(container, i, ContainerState.RUNNING, "",
-        exitCode);
-    waitForContainerTransitionCount(container,
-        org.apache.hadoop.yarn.server.nodemanager.
-            containermanager.container.ContainerState.RUNNING, 1);
-    // Test increase container API and make sure requests can reach NM
+            exitCode);
+    waitForContainerRunningTransitionCount(container, 1);
     testIncreaseContainerResource(container);
-
-    testRestartContainer(container.getId());
+    testRestartContainer(container);
     testGetContainerStatus(container, i, ContainerState.RUNNING,
-        "will be Restarted", exitCode);
-    waitForContainerTransitionCount(container,
-        org.apache.hadoop.yarn.server.nodemanager.
-            containermanager.container.ContainerState.RUNNING, 2);
-
+            "will be Restarted", exitCode);
+    waitForContainerRunningTransitionCount(container, 2);
     if (i % 2 == 0) {
-      testReInitializeContainer(container.getId(), clc, false);
+      testReInitializeContainer(container, clc, false);
       testGetContainerStatus(container, i, ContainerState.RUNNING,
-          "will be Re-initialized", exitCode);
-      waitForContainerTransitionCount(container,
-          org.apache.hadoop.yarn.server.nodemanager.
-              containermanager.container.ContainerState.RUNNING, 3);
-
-      testRollbackContainer(container.getId(), false);
+              "will be Re-initialized", exitCode);
+      waitForContainerRunningTransitionCount(container, 3);
+      testContainerRollback(container, true);
       testGetContainerStatus(container, i, ContainerState.RUNNING,
-          "will be Rolled-back", exitCode);
-      waitForContainerTransitionCount(container,
-          org.apache.hadoop.yarn.server.nodemanager.
-              containermanager.container.ContainerState.RUNNING, 4);
-
-      testCommitContainer(container.getId(), true);
-      testReInitializeContainer(container.getId(), clc, false);
+              "will be Rolled-back", exitCode);
+      waitForContainerRunningTransitionCount(container, 4);
+      testContainerCommit(container, false);
+      testReInitializeContainer(container, clc, false);
       testGetContainerStatus(container, i, ContainerState.RUNNING,
-          "will be Re-initialized", exitCode);
-      waitForContainerTransitionCount(container,
-          org.apache.hadoop.yarn.server.nodemanager.
-              containermanager.container.ContainerState.RUNNING, 5);
-      testCommitContainer(container.getId(), false);
+              "will be Re-initialized", exitCode);
+      waitForContainerRunningTransitionCount(container, 5);
+      testContainerCommit(container, true);
     } else {
-      testReInitializeContainer(container.getId(), clc, true);
+      testReInitializeContainer(container, clc, true);
       testGetContainerStatus(container, i, ContainerState.RUNNING,
-          "will be Re-initialized", exitCode);
-      waitForContainerTransitionCount(container,
-          org.apache.hadoop.yarn.server.nodemanager.
-              containermanager.container.ContainerState.RUNNING, 3);
-      testRollbackContainer(container.getId(), true);
-      testCommitContainer(container.getId(), true);
+              "will be Re-initialized", exitCode);
+      waitForContainerRunningTransitionCount(container, 3);
+      testContainerRollback(container, false);
+      testContainerCommit(container, false);
     }
-
-    try {
-      client.stopContainer(container.getId(), container.getNodeId());
-    } catch (YarnException e) {
-      throw (AssertionError)
-        (new AssertionError("Exception is not expected: " + e, e));
-    }
-
-    // getContainerStatus can be called after stopContainer
-    try {
-      // O is possible if CLEANUP_CONTAINER is executed too late
-      // -105 is possible if the container is not terminated but killed
-      testGetContainerStatus(container, i, ContainerState.COMPLETE,
-          "Container killed by the ApplicationMaster.",
-          Arrays.asList(
-              ContainerExitStatus.KILLED_BY_APPMASTER,
-              ContainerExitStatus.SUCCESS));
-    } catch (YarnException e) {
-      // The exception is possible because, after the container is stopped,
-      // it may be removed from NM's context.
-      if (!e.getMessage()
-            .contains("was recently stopped on node manager")) {
-        throw (AssertionError)
-          (new AssertionError("Exception is not expected: ", e));
-      }
-    }
-  }
-
-  /**
-   * Wait until the container reaches a state N times.
-   * @param container container to watch
-   * @param state state to test
-   * @param transitions the number N above
-   * @throws YarnException This happens if the test times out while waiting
-   */
-  private void waitForContainerTransitionCount(
-      Container container,
-      org.apache.hadoop.yarn.server.nodemanager.
-          containermanager.container.ContainerState state, long transitions)
-      throws YarnException {
-    long transitionCount = -1;
-    do {
-      if (transitionCount != -1) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException e) {
-          throw new YarnException(
-              "Timeout at transition count:" + transitionCount, e);
-        }
-      }
-      transitionCount = DebugSumContainerStateListener
-          .getTransitionCounter(container.getId(), state);
-    } while (transitionCount != transitions);
+    client.stopContainer(container.getId(), container.getNodeId());
+    testGetContainerStatus(container, i, ContainerState.COMPLETE,
+            "killed by the ApplicationMaster", exitCode);
   }
 
-  private void sleep(int sleepTime) {
-    try {
-      Thread.sleep(sleepTime);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+  private void waitForContainerRunningTransitionCount(Container container, 
long transitions) {
+    while (DebugSumContainerStateListener.RUNNING_TRANSITIONS
+        .getOrDefault(container.getId(), 0) != transitions) {
+      sleep(500);
     }
   }
 
+
   private void testGetContainerStatus(Container container, int index,
-      ContainerState state, String diagnostics, List<Integer> exitStatuses)
+                                      ContainerState state, String diagnostics,
+                                      List<Integer> exitStatuses)
           throws YarnException, IOException {
     while (true) {
       sleep(250);
       ContainerStatus status = nmClient.getContainerStatus(
-          container.getId(), container.getNodeId());
+              container.getId(), container.getNodeId());
       // NodeManager may still need some time to get the stable
       // container status
       if (status.getState() == state) {
         assertEquals(container.getId(), status.getContainerId());
-        assertTrue("" + index + ": " + status.getDiagnostics(),
-            status.getDiagnostics().contains(diagnostics));
+        assertTrue(index + ": " + status.getDiagnostics(),
+                status.getDiagnostics().contains(diagnostics));
 
         assertTrue("Exit Statuses are supposed to be in: " + exitStatuses +
-                ", but the actual exit status code is: " +
-                status.getExitStatus(),
-            exitStatuses.contains(status.getExitStatus()));
+                        ", but the actual exit status code is: " +
+                        status.getExitStatus(),
+                exitStatuses.contains(status.getExitStatus()));
         break;
       }
     }
   }
 
   @SuppressWarnings("deprecation")
-  private void testIncreaseContainerResource(Container container)
-    throws YarnException, IOException {
-    try {
-      nmClient.increaseContainerResource(container);
-    } catch (YarnException e) {
-      // NM container increase container resource should fail without a version
-      // increase action to fail.
-      if (!e.getMessage().contains(
-          container.getId() + " has update version ")) {
-        throw (AssertionError)
-            (new AssertionError("Exception is not expected: " + e)
-                .initCause(e));
-      }
-    }
+  private void testIncreaseContainerResource(Container container) {
+    assertYarnException(
+        () -> nmClient.increaseContainerResource(container),
+        container.getId() + " has update version ");
   }
 
-  private void testRestartContainer(ContainerId containerId)
-      throws YarnException, IOException {
-    try {
-      sleep(250);
-      nmClient.restartContainer(containerId);
-      sleep(250);
-    } catch (YarnException e) {
-      // NM container will only be in SCHEDULED state, so expect the increase
-      // action to fail.
-      if (!e.getMessage().contains(
-          "can only be changed when a container is in RUNNING state")) {
-        throw (AssertionError)
-            (new AssertionError("Exception is not expected: " + e)
-                .initCause(e));
-      }
-    }
+  private void testRestartContainer(Container container) throws IOException, 
YarnException {
+    nmClient.restartContainer(container.getId());
   }
 
-  private void testRollbackContainer(ContainerId containerId,
-      boolean notRollbackable) throws YarnException, IOException {
-    try {
-      sleep(250);
-      nmClient.rollbackLastReInitialization(containerId);
-      if (notRollbackable) {
-        fail("Should not be able to rollback..");
-      }
-      sleep(250);
-    } catch (YarnException e) {
-      // NM container will only be in SCHEDULED state, so expect the increase
-      // action to fail.
-      if (notRollbackable) {
-        Assert.assertTrue(e.getMessage().contains(
-            "Nothing to rollback to"));
-      } else {
-        if (!e.getMessage().contains(
-            "can only be changed when a container is in RUNNING state")) {
-          throw (AssertionError)
-              (new AssertionError("Exception is not expected: " + e)
-                  .initCause(e));
-        }
-      }
+  private void testContainerRollback(Container container, boolean enabled)
+      throws IOException, YarnException {
+    if (enabled) {
+      nmClient.rollbackLastReInitialization(container.getId());
+    } else {
+      assertYarnException(
+          () -> nmClient.rollbackLastReInitialization(container.getId()),
+          "Nothing to rollback to");
     }
   }
 
-  private void testCommitContainer(ContainerId containerId,
-      boolean notCommittable) throws YarnException, IOException {
-    try {
-      nmClient.commitLastReInitialization(containerId);
-      if (notCommittable) {
-        fail("Should not be able to commit..");
-      }
-    } catch (YarnException e) {
-      // NM container will only be in SCHEDULED state, so expect the increase
-      // action to fail.
-      if (notCommittable) {
-        Assert.assertTrue(e.getMessage().contains(
-            "Nothing to Commit"));
-      } else {
-        if (!e.getMessage().contains(
-            "can only be changed when a container is in RUNNING state")) {
-          throw (AssertionError)
-              (new AssertionError("Exception is not expected: " + e)
-                  .initCause(e));
-        }
-      }
+  private void testContainerCommit(Container container, boolean enabled)
+      throws IOException, YarnException {
+    if (enabled) {
+      nmClient.commitLastReInitialization(container.getId());
+    } else {
+      assertYarnException(
+          () -> nmClient.commitLastReInitialization(container.getId()),
+          "Nothing to Commit");
     }
   }
 
-  private void testReInitializeContainer(ContainerId containerId,
-      ContainerLaunchContext clc, boolean autoCommit)
-      throws YarnException, IOException {
+  private void testReInitializeContainer(
+      Container container, ContainerLaunchContext clc, boolean autoCommit
+  ) throws IOException, YarnException {
+    nmClient.reInitializeContainer(container.getId(), clc, autoCommit);
+  }
+
+  private void assertYarnException(ThrowingRunnable runnable, String text) {
+    YarnException e = assertThrows(YarnException.class, runnable);
+    assertTrue(String.format("The thrown exception is not expected cause it 
has text [%s]"
+        + ", what not contains text [%s]", e.getMessage(), text), 
e.getMessage().contains(text));
+  }
+
+  private void sleep(int sleepTime) {
     try {
-      nmClient.reInitializeContainer(containerId, clc, autoCommit);
-    } catch (YarnException e) {
-      // NM container will only be in SCHEDULED state, so expect the increase
-      // action to fail.
-      if (!e.getMessage().contains(
-          "can only be changed when a container is in RUNNING state")) {
-        throw (AssertionError)
-            (new AssertionError("Exception is not expected: " + e)
-                .initCause(e));
-      }
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to