K0K0V0K commented on code in PR #5119: URL: https://github.com/apache/hadoop/pull/5119#discussion_r1026644072
########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); Review Comment: I think "App not found in the RMContext" would be more informative ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); Review Comment: I think "App not found in the RMContext" text can be used here as well ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); Review Comment: I think this can be private as well, and we should use the CommonUtil.class for logger ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + int timeWaiting = 0; + while (app.getAppAttempts().size() != attemptSize) { + if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) { + break; + } + LOG.info("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + return rm.launchAndRegisterAM(app, rm, nm); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. + * + * @param nm A mock nodemanager + * @param containerId the id of a container + * @param containerState the container state waited + * @return if reach the state before timeout; false otherwise. + * @throws Exception if interrupted while waiting for the state transition + * or an unexpected error while MockNM is hearbeating. + */ + public static boolean waitForState(MockRM rm, MockNM nm, ContainerId containerId, + RMContainerState containerState) throws Exception { + return waitForState(rm, nm, containerId, containerState, + TIMEOUT_MS_FOR_CONTAINER_AND_NODE); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is specified by the parameter. + * + * @param nm A mock nodemanager + * @param containerId the id of a container + * @param containerState the container state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @return if reach the state before timeout; false otherwise. + * @throws Exception if interrupted while waiting for the state transition + * or an unexpected error while MockNM is hearbeating. + */ + public static boolean waitForState(MockRM rm, MockNM nm, ContainerId containerId, + RMContainerState containerState, int timeoutMsecs) throws Exception { + return waitForState(rm, Arrays.asList(nm), containerId, containerState, + timeoutMsecs); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. + * + * @param nms array of mock nodemanagers + * @param containerId the id of a container + * @param containerState the container state waited + * @return if reach the state before timeout; false otherwise. + * @throws Exception if interrupted while waiting for the state transition + * or an unexpected error while MockNM is hearbeating. + */ + public static boolean waitForState(MockRM rm, Collection<MockNM> nms, ContainerId containerId, + RMContainerState containerState) throws Exception { + return waitForState(rm, nms, containerId, containerState, + TIMEOUT_MS_FOR_CONTAINER_AND_NODE); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is specified by the parameter. + * + * @param nms array of mock nodemanagers + * @param containerId the id of a container + * @param containerState the container state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @return if reach the state before timeout; false otherwise. + * @throws Exception if interrupted while waiting for the state transition + * or an unexpected error while MockNM is hearbeating. + */ + public static boolean waitForState(MockRM rm, Collection<MockNM> nms, ContainerId containerId, + RMContainerState containerState, int timeoutMsecs) throws Exception { + rm.drainEventsImplicitly(); + RMContainer container = rm.getResourceScheduler().getRMContainer(containerId); + int timeWaiting = 0; + while (container == null) { + if (timeWaiting >= timeoutMsecs) { + return false; + } + + for (MockNM nm : nms) { + nm.nodeHeartbeat(true); + } + rm.drainEventsImplicitly(); + container = rm.getResourceScheduler().getRMContainer(containerId); + LOG.info("Waiting for container " + containerId + " to be " + + containerState + ", container is null right now."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + while (!containerState.equals(container.getState())) { + if (timeWaiting >= timeoutMsecs) { + return false; + } + + LOG.info("Container : " + containerId + " State is : " + + container.getState() + " Waiting for state : " + containerState); + for (MockNM nm : nms) { + nm.nodeHeartbeat(true); + } + rm.drainEventsImplicitly(); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Container State is : " + container.getState()); + return true; + } + + /** + * Wait until a node has reached a specified state. + * The timeout is 20 seconds. + * + * @param nodeId the id of a node + * @param finalState the node state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, NodeId nodeId, NodeState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + RMNode node = rm.getRMNode(nodeId); + while (node == null) { + if (timeWaiting >= TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + break; + } + node = rm.getRMNode(nodeId); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + Assert.assertNotNull("node shouldn't be null (timedout)", node); + while (!finalState.equals(node.getState())) { + if (timeWaiting >= TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + break; + } + + LOG.info("Node State is : " + node.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Node " + nodeId + " State is : " + node.getState()); + Assert.assertEquals("Node state is not correct (timedout)", finalState, + node.getState()); + } + + @SuppressWarnings("rawtypes") + public static void waitForSchedulerAppAttemptAdded( + ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException { + int tick = 0; + rm.drainEventsImplicitly(); + // Wait for at most 5 sec + while (null == ((AbstractYarnScheduler) rm.getResourceScheduler()) + .getApplicationAttempt(attemptId) && tick < 50) { + Thread.sleep(100); + if (tick % 10 == 0) { + LOG.info("waiting for SchedulerApplicationAttempt=" + + attemptId + " added."); + } + tick++; + } + Assert.assertNotNull("Timed out waiting for SchedulerApplicationAttempt=" + + attemptId + " to be added.", ((AbstractYarnScheduler) + rm.getResourceScheduler()).getApplicationAttempt(attemptId)); + } + + public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) + throws Exception { + waitForState(rm, app.getApplicationId(), RMAppState.ACCEPTED); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); + waitForState(rm, attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + return attempt; + } + + /** + * Wait until an app removed from scheduler. + * The timeout is 40 seconds. + * @param appId the id of an app + * @throws InterruptedException + * if interrupted while waiting for app removed + */ + public static void waitForAppRemovedFromScheduler(MockRM rm, ApplicationId appId) + throws InterruptedException { + int timeWaiting = 0; + rm.drainEventsImplicitly(); + Map<ApplicationId, SchedulerApplication> apps = + ((AbstractYarnScheduler) rm.getResourceScheduler()) + .getSchedulerApplications(); + while (apps.containsKey(appId)) { + if (timeWaiting >= TIMEOUT_MS_FOR_APP_REMOVED) { + break; + } + LOG.info("wait for app removed, " + appId); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + Assert.assertTrue("app is not removed from scheduler (timeout).", + !apps.containsKey(appId)); + LOG.info("app is removed from scheduler, " + appId); + } + + /** + * Wait until a container has reached a completion state. + * @param rm A mock resourcemanager + * @param nm A mock nodemanager + * @param amContainerId A containerId for AM + * @param container Resourcemanager's view of an application container + */ + public static void waitforContainerCompletion(MockRM rm, MockNM nm, + ContainerId amContainerId, RMContainer container) throws Exception { + ContainerId containerId = container.getContainerId(); + if (null != rm.scheduler.getRMContainer(containerId)) { + if (containerId.equals(amContainerId)) { + CommonUtil.waitForState(rm, nm, containerId, RMContainerState.COMPLETED); + } else { + CommonUtil.waitForState(rm, nm, containerId, RMContainerState.KILLED); + } Review Comment: I think this can be replace with a ternary operator, so we can reduce the code here. ``` finalState = containerId.equals(amContainerId) ? RMContainerState.COMPLETED : RMContainerState.KILLED CommonUtil.waitForState(rm, nm, containerId, finalState) ``` ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + int timeWaiting = 0; + while (app.getAppAttempts().size() != attemptSize) { + if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) { + break; + } + LOG.info("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + return rm.launchAndRegisterAM(app, rm, nm); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. Review Comment: i think this is 20 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + int timeWaiting = 0; + while (app.getAppAttempts().size() != attemptSize) { + if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) { + break; + } + LOG.info("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + return rm.launchAndRegisterAM(app, rm, nm); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. Review Comment: maybe we can write this instead of hardcoded values `The timeout is {@link #TIMEOUT_MS_FOR_CONTAINER_AND_NODE} milliseconds.` ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java: ########## @@ -30,11 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.MockAM; -import org.apache.hadoop.yarn.server.resourcemanager.MockNM; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; -import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; +import org.apache.hadoop.yarn.server.resourcemanager.*; Review Comment: Can you please revert this import change? https://peterdev.pl/why-you-should-avoid-star-imports-in-intellij-idea/ ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { Review Comment: Because this is a public class, what can be seen from all of the test classes i think CommonUtil is too abstract name for this class. Maybe **ResorceManagerTesterUtils** can be a more informative name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org