susheel-gupta commented on code in PR #5119: URL: https://github.com/apache/hadoop/pull/5119#discussion_r1027642975
########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + int timeWaiting = 0; + while (app.getAppAttempts().size() != attemptSize) { + if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) { + break; + } + LOG.info("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + return rm.launchAndRegisterAM(app, rm, nm); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. Review Comment: @K0K0V0K The timeout is specified by the parameter.They already mentioned 10, maybe it was calculated or general timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org