This is an automated email from the ASF dual-hosted git repository. bteke pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 5d1f8894328b YARN-11420. Stabilize TestNMClient (#5317) 5d1f8894328b is described below commit 5d1f8894328b5536255660a9bcf6a207d125f5c1 Author: K0K0V0K <109747532+k0k0...@users.noreply.github.com> AuthorDate: Tue Dec 5 11:32:42 2023 +0100 YARN-11420. Stabilize TestNMClient (#5317) --- .../hadoop/yarn/client/api/impl/TestNMClient.java | 687 +++++++-------------- 1 file changed, 219 insertions(+), 468 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index eb2ecb96cf20..ce24d75d629f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -23,28 +23,23 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMClient; @@ -60,53 +55,48 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; + import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.junit.function.ThrowingRunnable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class TestNMClient { - Configuration conf = null; - MiniYARNCluster yarnCluster = null; - YarnClientImpl yarnClient = null; - AMRMClientImpl<ContainerRequest> rmClient = null; - NMClientImpl nmClient = null; - List<NodeReport> nodeReports = null; - ApplicationAttemptId attemptId = null; - int nodeCount = 3; - NMTokenCache nmTokenCache = null; + private static final String IS_NOT_HANDLED_BY_THIS_NODEMANAGER = + "is not handled by this NodeManager"; + private static final String UNKNOWN_CONTAINER = + "Unknown container"; + + private static final int NUMBER_OF_CONTAINERS = 5; + private Configuration conf; + private MiniYARNCluster yarnCluster; + private YarnClientImpl yarnClient; + private AMRMClientImpl<ContainerRequest> rmClient; + private NMClientImpl nmClient; + private List<NodeReport> nodeReports; + private NMTokenCache nmTokenCache; + private RMAppAttempt appAttempt; /** * Container State transition listener to track the number of times * a container has transitioned into a state. */ - public static class DebugSumContainerStateListener - implements ContainerStateTransitionListener { - - private static final Logger LOG = - LoggerFactory.getLogger(DebugSumContainerStateListener.class); - private static final Map<ContainerId, - Map<org.apache.hadoop.yarn.server.nodemanager.containermanager - .container.ContainerState, Long>> - TRANSITION_COUNTER = new HashMap<>(); + public static class DebugSumContainerStateListener implements ContainerStateTransitionListener { + public static final Map<ContainerId, Integer> RUNNING_TRANSITIONS = new ConcurrentHashMap<>(); public void init(Context context) { } @@ -125,576 +115,337 @@ public class TestNMClient { org.apache.hadoop.yarn.server.nodemanager.containermanager.container .ContainerState afterState, ContainerEvent processedEvent) { - synchronized (TRANSITION_COUNTER) { - if (beforeState != afterState) { - ContainerId id = op.getContainerId(); - TRANSITION_COUNTER - .putIfAbsent(id, new HashMap<>()); - long sum = TRANSITION_COUNTER.get(id) - .compute(afterState, - (state, count) -> count == null ? 1 : count + 1); - LOG.info("***** " + id + - " Transition from " + beforeState + - " to " + afterState + - "sum:" + sum); - } + if (beforeState != afterState && + afterState == org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.RUNNING) { + RUNNING_TRANSITIONS.compute(op.getContainerId(), + (containerId, counter) -> counter == null ? 1 : ++counter); } } - - /** - * Get the current number of state transitions. - * This is useful to check, if an event has occurred in unit tests. - * @param id Container id to check - * @param state Return the overall number of transitions to this state - * @return Number of transitions to the state specified - */ - static long getTransitionCounter(ContainerId id, - org.apache.hadoop.yarn.server.nodemanager - .containermanager.container - .ContainerState state) { - Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>()) - .get(state); - return ret != null ? ret : 0; - } } - @Before - public void setup() throws YarnException, IOException { - // start minicluster + public void setup() throws YarnException, IOException, InterruptedException, TimeoutException { conf = new YarnConfiguration(); - // Turn on state tracking conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS, DebugSumContainerStateListener.class.getName()); - yarnCluster = - new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); + startYarnCluster(); + startYarnClient(); + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + nmTokenCache = new NMTokenCache(); + startRMClient(); + startNMClient(); + } + + + private void startYarnCluster() { + yarnCluster = new MiniYARNCluster(TestNMClient.class.getName(), 3, 1, 1); yarnCluster.init(conf); yarnCluster.start(); - assertNotNull(yarnCluster); assertEquals(STATE.STARTED, yarnCluster.getServiceState()); + } - // start rm client + private void startYarnClient() + throws IOException, YarnException, InterruptedException, TimeoutException { yarnClient = (YarnClientImpl) YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start(); - assertNotNull(yarnClient); assertEquals(STATE.STARTED, yarnClient.getServiceState()); - - // get node info nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); - // submit new app - ApplicationSubmissionContext appContext = + ApplicationSubmissionContext appContext = yarnClient.createApplication().getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); - // set the application name appContext.setApplicationName("Test"); - // Set the priority for the application master Priority pri = Priority.newInstance(0); appContext.setPriority(pri); - // Set the queue to which this application is to be submitted in the RM appContext.setQueue("default"); - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = Records - .newRecord(ContainerLaunchContext.class); + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); appContext.setAMContainerSpec(amContainer); - // unmanaged AM appContext.setUnmanagedAM(true); - // Create the request to send to the applications manager - SubmitApplicationRequest appRequest = Records - .newRecord(SubmitApplicationRequest.class); + + SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class); appRequest.setApplicationSubmissionContext(appContext); - // Submit the application to the applications manager yarnClient.submitApplication(appContext); + GenericTestUtils.waitFor(() -> yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(appId).getCurrentAppAttempt().getAppAttemptState() == RMAppAttemptState.LAUNCHED, + 100, 30_000, "Failed to start app"); + appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(appId).getCurrentAppAttempt(); + } - // wait for app to start - int iterationsLeft = 30; - RMAppAttempt appAttempt = null; - while (iterationsLeft > 0) { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - if (appReport.getYarnApplicationState() == - YarnApplicationState.ACCEPTED) { - attemptId = appReport.getCurrentApplicationAttemptId(); - appAttempt = - yarnCluster.getResourceManager().getRMContext().getRMApps() - .get(attemptId.getApplicationId()).getCurrentAppAttempt(); - while (true) { - if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { - break; - } - } - break; - } - sleep(1000); - --iterationsLeft; - } - if (iterationsLeft == 0) { - fail("Application hasn't bee started"); - } - - // Just dig into the ResourceManager and get the AMRMToken just for the sake - // of testing. - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); - UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); - - //creating an instance NMTokenCase - nmTokenCache = new NMTokenCache(); - - // start am rm client - rmClient = - (AMRMClientImpl<ContainerRequest>) AMRMClient - .<ContainerRequest> createAMRMClient(); - - //setting an instance NMTokenCase + private void startRMClient() { + rmClient = (AMRMClientImpl<ContainerRequest>) AMRMClient.createAMRMClient(); rmClient.setNMTokenCache(nmTokenCache); rmClient.init(conf); rmClient.start(); - assertNotNull(rmClient); assertEquals(STATE.STARTED, rmClient.getServiceState()); + } - // start am nm client + private void startNMClient() { nmClient = (NMClientImpl) NMClient.createNMClient(); - - //propagating the AMRMClient NMTokenCache instance nmClient.setNMTokenCache(rmClient.getNMTokenCache()); nmClient.init(conf); nmClient.start(); - assertNotNull(nmClient); assertEquals(STATE.STARTED, nmClient.getServiceState()); } - @After - public void tearDown() { + public void tearDown() throws InterruptedException { rmClient.stop(); yarnClient.stop(); yarnCluster.stop(); } - private void stopNmClient(boolean stopContainers) { - assertNotNull("Null nmClient", nmClient); - // leave one unclosed - assertEquals(1, nmClient.startedContainers.size()); - // default true - assertTrue(nmClient.getCleanupRunningContainers().get()); - nmClient.cleanupRunningContainersOnStop(stopContainers); - assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get()); - nmClient.stop(); - } - - @Test (timeout = 180000) + @Test (timeout = 180_000) public void testNMClientNoCleanupOnStop() - throws YarnException, IOException { - - rmClient.registerApplicationMaster("Host", 10000, ""); + throws YarnException, IOException, InterruptedException, TimeoutException { + runTest(() -> { + stopNmClient(); + assertFalse(nmClient.startedContainers.isEmpty()); + nmClient.cleanupRunningContainers(); + assertEquals(0, nmClient.startedContainers.size()); + }); + } - testContainerManagement(nmClient, allocateContainers(rmClient, 5)); + @Test (timeout = 200_000) + public void testNMClient() + throws YarnException, IOException, InterruptedException, TimeoutException { + runTest(() -> { + // stop the running containers on close + assertFalse(nmClient.startedContainers.isEmpty()); + nmClient.cleanupRunningContainersOnStop(true); + assertTrue(nmClient.getCleanupRunningContainers().get()); + nmClient.stop(); + }); + } - rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - null, null); - // don't stop the running containers - stopNmClient(false); - assertFalse(nmClient.startedContainers.isEmpty()); - //now cleanup - nmClient.cleanupRunningContainers(); - assertEquals(0, nmClient.startedContainers.size()); + public void runTest( + Runnable test + ) throws IOException, InterruptedException, YarnException, TimeoutException { + setup(); + rmClient.registerApplicationMaster("Host", 10_000, ""); + testContainerManagement(nmClient, allocateContainers(rmClient)); + rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + test.run(); + tearDown(); } - @Test (timeout = 200000) - public void testNMClient() - throws YarnException, IOException { - rmClient.registerApplicationMaster("Host", 10000, ""); - - testContainerManagement(nmClient, allocateContainers(rmClient, 5)); - - rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - null, null); - // stop the running containers on close - assertFalse(nmClient.startedContainers.isEmpty()); - nmClient.cleanupRunningContainersOnStop(true); + private void stopNmClient() { + assertNotNull("Null nmClient", nmClient); + // leave one unclosed + assertEquals(1, nmClient.startedContainers.size()); + // default true assertTrue(nmClient.getCleanupRunningContainers().get()); + nmClient.cleanupRunningContainersOnStop(false); + assertFalse(nmClient.getCleanupRunningContainers().get()); nmClient.stop(); } private Set<Container> allocateContainers( - AMRMClientImpl<ContainerRequest> rmClient, int num) - throws YarnException, IOException { - // setup container request - Resource capability = Resource.newInstance(1024, 0); - Priority priority = Priority.newInstance(0); - String node = nodeReports.get(0).getNodeId().getHost(); - String rack = nodeReports.get(0).getRackName(); - String[] nodes = new String[] {node}; - String[] racks = new String[] {rack}; - - for (int i = 0; i < num; ++i) { - rmClient.addContainerRequest(new ContainerRequest(capability, nodes, - racks, priority)); + AMRMClientImpl<ContainerRequest> client + ) throws YarnException, IOException { + for (int i = 0; i < NUMBER_OF_CONTAINERS; ++i) { + client.addContainerRequest(new ContainerRequest( + Resource.newInstance(1024, 0), + new String[] {nodeReports.get(0).getNodeId().getHost()}, + new String[] {nodeReports.get(0).getRackName()}, + Priority.newInstance(0) + )); } - - int containersRequestedAny = rmClient.getTable(0) - .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, - capability).remoteRequest.getNumContainers(); - - // RM should allocate container within 2 calls to allocate() - int allocatedContainerCount = 0; - int iterationsLeft = 2; - Set<Container> containers = new TreeSet<Container>(); - while (allocatedContainerCount < containersRequestedAny - && iterationsLeft > 0) { - AllocateResponse allocResponse = rmClient.allocate(0.1f); - - allocatedContainerCount += allocResponse.getAllocatedContainers().size(); - for(Container container : allocResponse.getAllocatedContainers()) { - containers.add(container); + Set<Container> allocatedContainers = new TreeSet<>(); + while (allocatedContainers.size() < NUMBER_OF_CONTAINERS) { + AllocateResponse allocResponse = client.allocate(0.1f); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + for (NMToken token : allocResponse.getNMTokens()) { + client.getNMTokenCache().setToken(token.getNodeId().toString(), token.getToken()); } - if (!allocResponse.getNMTokens().isEmpty()) { - for (NMToken token : allocResponse.getNMTokens()) { - rmClient.getNMTokenCache().setToken(token.getNodeId().toString(), - token.getToken()); - } + if (allocatedContainers.size() < NUMBER_OF_CONTAINERS) { + sleep(100); } - if(allocatedContainerCount < containersRequestedAny) { - // sleep to let NM's heartbeat to RM and trigger allocations - sleep(1000); - } - - --iterationsLeft; } - return containers; + return allocatedContainers; } - private void testContainerManagement(NMClientImpl client, - Set<Container> containers) throws YarnException, IOException { + private void testContainerManagement( + NMClientImpl client, Set<Container> containers + ) throws YarnException, IOException { int size = containers.size(); int i = 0; for (Container container : containers) { // getContainerStatus shouldn't be called before startContainer, // otherwise, NodeManager cannot find the container - try { - client.getContainerStatus(container.getId(), container.getNodeId()); - fail("Exception is expected"); - } catch (YarnException e) { - assertTrue("The thrown exception is not expected", - e.getMessage().contains("is not handled by this NodeManager")); - } + assertYarnException( + () -> client.getContainerStatus(container.getId(), container.getNodeId()), + IS_NOT_HANDLED_BY_THIS_NODEMANAGER); // upadateContainerResource shouldn't be called before startContainer, // otherwise, NodeManager cannot find the container - try { - client.updateContainerResource(container); - fail("Exception is expected"); - } catch (YarnException e) { - assertTrue("The thrown exception is not expected", - e.getMessage().contains("is not handled by this NodeManager")); - } - + assertYarnException( + () -> client.updateContainerResource(container), + IS_NOT_HANDLED_BY_THIS_NODEMANAGER); // restart shouldn't be called before startContainer, // otherwise, NodeManager cannot find the container - try { - client.restartContainer(container.getId()); - fail("Exception is expected"); - } catch (YarnException e) { - assertTrue("The thrown exception is not expected", - e.getMessage().contains("Unknown container")); - } - + assertYarnException( + () -> client.restartContainer(container.getId()), + UNKNOWN_CONTAINER); // rollback shouldn't be called before startContainer, // otherwise, NodeManager cannot find the container - try { - client.rollbackLastReInitialization(container.getId()); - fail("Exception is expected"); - } catch (YarnException e) { - assertTrue("The thrown exception is not expected", - e.getMessage().contains("Unknown container")); - } - + assertYarnException( + () -> client.rollbackLastReInitialization(container.getId()), + UNKNOWN_CONTAINER); // commit shouldn't be called before startContainer, // otherwise, NodeManager cannot find the container - try { - client.commitLastReInitialization(container.getId()); - fail("Exception is expected"); - } catch (YarnException e) { - assertTrue("The thrown exception is not expected", - e.getMessage().contains("Unknown container")); - } - + assertYarnException( + () -> client.commitLastReInitialization(container.getId()), + UNKNOWN_CONTAINER); // stopContainer shouldn't be called before startContainer, // otherwise, an exception will be thrown - try { - client.stopContainer(container.getId(), container.getNodeId()); - fail("Exception is expected"); - } catch (YarnException e) { - if (!e.getMessage() - .contains("is not handled by this NodeManager")) { - throw new AssertionError("Exception is not expected: ", e); - } - } + assertYarnException( + () -> client.stopContainer(container.getId(), container.getNodeId()), + IS_NOT_HANDLED_BY_THIS_NODEMANAGER); Credentials ts = new Credentials(); DataOutputBuffer dob = new DataOutputBuffer(); ts.writeTokenStorageToStream(dob); - ByteBuffer securityTokens = - ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - ContainerLaunchContext clc = - Records.newRecord(ContainerLaunchContext.class); - if (Shell.WINDOWS) { - clc.setCommands( - Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul")); - } else { - clc.setCommands(Arrays.asList("sleep", "1000000")); - } + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ContainerLaunchContext clc = Records.newRecord(ContainerLaunchContext.class); + clc.setCommands(Shell.WINDOWS + ? Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul") + : Arrays.asList("sleep", "1000000") + ); clc.setTokens(securityTokens); - try { - client.startContainer(container, clc); - } catch (YarnException e) { - throw new AssertionError("Exception is not expected ", e); - } - List<Integer> exitStatuses = Collections.singletonList(-1000); + client.startContainer(container, clc); + List<Integer> exitStatuses = Arrays.asList(-1000, -105); // leave one container unclosed if (++i < size) { testContainer(client, i, container, clc, exitStatuses); - } } } private void testContainer(NMClientImpl client, int i, Container container, ContainerLaunchContext clc, List<Integer> exitCode) - throws YarnException, IOException { - // NodeManager may still need some time to make the container started + throws YarnException, IOException { testGetContainerStatus(container, i, ContainerState.RUNNING, "", - exitCode); - waitForContainerTransitionCount(container, - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING, 1); - // Test increase container API and make sure requests can reach NM + exitCode); + waitForContainerRunningTransitionCount(container, 1); testIncreaseContainerResource(container); - - testRestartContainer(container.getId()); + testRestartContainer(container); testGetContainerStatus(container, i, ContainerState.RUNNING, - "will be Restarted", exitCode); - waitForContainerTransitionCount(container, - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING, 2); - + "will be Restarted", exitCode); + waitForContainerRunningTransitionCount(container, 2); if (i % 2 == 0) { - testReInitializeContainer(container.getId(), clc, false); + testReInitializeContainer(container, clc, false); testGetContainerStatus(container, i, ContainerState.RUNNING, - "will be Re-initialized", exitCode); - waitForContainerTransitionCount(container, - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING, 3); - - testRollbackContainer(container.getId(), false); + "will be Re-initialized", exitCode); + waitForContainerRunningTransitionCount(container, 3); + testContainerRollback(container, true); testGetContainerStatus(container, i, ContainerState.RUNNING, - "will be Rolled-back", exitCode); - waitForContainerTransitionCount(container, - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING, 4); - - testCommitContainer(container.getId(), true); - testReInitializeContainer(container.getId(), clc, false); + "will be Rolled-back", exitCode); + waitForContainerRunningTransitionCount(container, 4); + testContainerCommit(container, false); + testReInitializeContainer(container, clc, false); testGetContainerStatus(container, i, ContainerState.RUNNING, - "will be Re-initialized", exitCode); - waitForContainerTransitionCount(container, - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING, 5); - testCommitContainer(container.getId(), false); + "will be Re-initialized", exitCode); + waitForContainerRunningTransitionCount(container, 5); + testContainerCommit(container, true); } else { - testReInitializeContainer(container.getId(), clc, true); + testReInitializeContainer(container, clc, true); testGetContainerStatus(container, i, ContainerState.RUNNING, - "will be Re-initialized", exitCode); - waitForContainerTransitionCount(container, - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING, 3); - testRollbackContainer(container.getId(), true); - testCommitContainer(container.getId(), true); + "will be Re-initialized", exitCode); + waitForContainerRunningTransitionCount(container, 3); + testContainerRollback(container, false); + testContainerCommit(container, false); } - - try { - client.stopContainer(container.getId(), container.getNodeId()); - } catch (YarnException e) { - throw (AssertionError) - (new AssertionError("Exception is not expected: " + e, e)); - } - - // getContainerStatus can be called after stopContainer - try { - // O is possible if CLEANUP_CONTAINER is executed too late - // -105 is possible if the container is not terminated but killed - testGetContainerStatus(container, i, ContainerState.COMPLETE, - "Container killed by the ApplicationMaster.", - Arrays.asList( - ContainerExitStatus.KILLED_BY_APPMASTER, - ContainerExitStatus.SUCCESS)); - } catch (YarnException e) { - // The exception is possible because, after the container is stopped, - // it may be removed from NM's context. - if (!e.getMessage() - .contains("was recently stopped on node manager")) { - throw (AssertionError) - (new AssertionError("Exception is not expected: ", e)); - } - } - } - - /** - * Wait until the container reaches a state N times. - * @param container container to watch - * @param state state to test - * @param transitions the number N above - * @throws YarnException This happens if the test times out while waiting - */ - private void waitForContainerTransitionCount( - Container container, - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState state, long transitions) - throws YarnException { - long transitionCount = -1; - do { - if (transitionCount != -1) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new YarnException( - "Timeout at transition count:" + transitionCount, e); - } - } - transitionCount = DebugSumContainerStateListener - .getTransitionCounter(container.getId(), state); - } while (transitionCount != transitions); + client.stopContainer(container.getId(), container.getNodeId()); + testGetContainerStatus(container, i, ContainerState.COMPLETE, + "killed by the ApplicationMaster", exitCode); } - private void sleep(int sleepTime) { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - e.printStackTrace(); + private void waitForContainerRunningTransitionCount(Container container, long transitions) { + while (DebugSumContainerStateListener.RUNNING_TRANSITIONS + .getOrDefault(container.getId(), 0) != transitions) { + sleep(500); } } + private void testGetContainerStatus(Container container, int index, - ContainerState state, String diagnostics, List<Integer> exitStatuses) + ContainerState state, String diagnostics, + List<Integer> exitStatuses) throws YarnException, IOException { while (true) { sleep(250); ContainerStatus status = nmClient.getContainerStatus( - container.getId(), container.getNodeId()); + container.getId(), container.getNodeId()); // NodeManager may still need some time to get the stable // container status if (status.getState() == state) { assertEquals(container.getId(), status.getContainerId()); - assertTrue("" + index + ": " + status.getDiagnostics(), - status.getDiagnostics().contains(diagnostics)); + assertTrue(index + ": " + status.getDiagnostics(), + status.getDiagnostics().contains(diagnostics)); assertTrue("Exit Statuses are supposed to be in: " + exitStatuses + - ", but the actual exit status code is: " + - status.getExitStatus(), - exitStatuses.contains(status.getExitStatus())); + ", but the actual exit status code is: " + + status.getExitStatus(), + exitStatuses.contains(status.getExitStatus())); break; } } } @SuppressWarnings("deprecation") - private void testIncreaseContainerResource(Container container) - throws YarnException, IOException { - try { - nmClient.increaseContainerResource(container); - } catch (YarnException e) { - // NM container increase container resource should fail without a version - // increase action to fail. - if (!e.getMessage().contains( - container.getId() + " has update version ")) { - throw (AssertionError) - (new AssertionError("Exception is not expected: " + e) - .initCause(e)); - } - } + private void testIncreaseContainerResource(Container container) { + assertYarnException( + () -> nmClient.increaseContainerResource(container), + container.getId() + " has update version "); } - private void testRestartContainer(ContainerId containerId) - throws YarnException, IOException { - try { - sleep(250); - nmClient.restartContainer(containerId); - sleep(250); - } catch (YarnException e) { - // NM container will only be in SCHEDULED state, so expect the increase - // action to fail. - if (!e.getMessage().contains( - "can only be changed when a container is in RUNNING state")) { - throw (AssertionError) - (new AssertionError("Exception is not expected: " + e) - .initCause(e)); - } - } + private void testRestartContainer(Container container) throws IOException, YarnException { + nmClient.restartContainer(container.getId()); } - private void testRollbackContainer(ContainerId containerId, - boolean notRollbackable) throws YarnException, IOException { - try { - sleep(250); - nmClient.rollbackLastReInitialization(containerId); - if (notRollbackable) { - fail("Should not be able to rollback.."); - } - sleep(250); - } catch (YarnException e) { - // NM container will only be in SCHEDULED state, so expect the increase - // action to fail. - if (notRollbackable) { - Assert.assertTrue(e.getMessage().contains( - "Nothing to rollback to")); - } else { - if (!e.getMessage().contains( - "can only be changed when a container is in RUNNING state")) { - throw (AssertionError) - (new AssertionError("Exception is not expected: " + e) - .initCause(e)); - } - } + private void testContainerRollback(Container container, boolean enabled) + throws IOException, YarnException { + if (enabled) { + nmClient.rollbackLastReInitialization(container.getId()); + } else { + assertYarnException( + () -> nmClient.rollbackLastReInitialization(container.getId()), + "Nothing to rollback to"); } } - private void testCommitContainer(ContainerId containerId, - boolean notCommittable) throws YarnException, IOException { - try { - nmClient.commitLastReInitialization(containerId); - if (notCommittable) { - fail("Should not be able to commit.."); - } - } catch (YarnException e) { - // NM container will only be in SCHEDULED state, so expect the increase - // action to fail. - if (notCommittable) { - Assert.assertTrue(e.getMessage().contains( - "Nothing to Commit")); - } else { - if (!e.getMessage().contains( - "can only be changed when a container is in RUNNING state")) { - throw (AssertionError) - (new AssertionError("Exception is not expected: " + e) - .initCause(e)); - } - } + private void testContainerCommit(Container container, boolean enabled) + throws IOException, YarnException { + if (enabled) { + nmClient.commitLastReInitialization(container.getId()); + } else { + assertYarnException( + () -> nmClient.commitLastReInitialization(container.getId()), + "Nothing to Commit"); } } - private void testReInitializeContainer(ContainerId containerId, - ContainerLaunchContext clc, boolean autoCommit) - throws YarnException, IOException { + private void testReInitializeContainer( + Container container, ContainerLaunchContext clc, boolean autoCommit + ) throws IOException, YarnException { + nmClient.reInitializeContainer(container.getId(), clc, autoCommit); + } + + private void assertYarnException(ThrowingRunnable runnable, String text) { + YarnException e = assertThrows(YarnException.class, runnable); + assertTrue(String.format("The thrown exception is not expected cause it has text [%s]" + + ", what not contains text [%s]", e.getMessage(), text), e.getMessage().contains(text)); + } + + private void sleep(int sleepTime) { try { - nmClient.reInitializeContainer(containerId, clc, autoCommit); - } catch (YarnException e) { - // NM container will only be in SCHEDULED state, so expect the increase - // action to fail. - if (!e.getMessage().contains( - "can only be changed when a container is in RUNNING state")) { - throw (AssertionError) - (new AssertionError("Exception is not expected: " + e) - .initCause(e)); - } + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org