[ https://issues.apache.org/jira/browse/YARN-5607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635962#comment-17635962 ]
ASF GitHub Bot commented on YARN-5607: -------------------------------------- K0K0V0K commented on code in PR #5119: URL: https://github.com/apache/hadoop/pull/5119#discussion_r1026644072 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); Review Comment: I think "App not found in the RMContext" would be more informative ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); Review Comment: I think "App not found in the RMContext" text can be used here as well ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); Review Comment: I think this can be private as well, and we should use the CommonUtil.class for logger ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + int timeWaiting = 0; + while (app.getAppAttempts().size() != attemptSize) { + if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) { + break; + } + LOG.info("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + return rm.launchAndRegisterAM(app, rm, nm); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. + * + * @param nm A mock nodemanager + * @param containerId the id of a container + * @param containerState the container state waited + * @return if reach the state before timeout; false otherwise. + * @throws Exception if interrupted while waiting for the state transition + * or an unexpected error while MockNM is hearbeating. + */ + public static boolean waitForState(MockRM rm, MockNM nm, ContainerId containerId, + RMContainerState containerState) throws Exception { + return waitForState(rm, nm, containerId, containerState, + TIMEOUT_MS_FOR_CONTAINER_AND_NODE); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is specified by the parameter. + * + * @param nm A mock nodemanager + * @param containerId the id of a container + * @param containerState the container state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @return if reach the state before timeout; false otherwise. + * @throws Exception if interrupted while waiting for the state transition + * or an unexpected error while MockNM is hearbeating. + */ + public static boolean waitForState(MockRM rm, MockNM nm, ContainerId containerId, + RMContainerState containerState, int timeoutMsecs) throws Exception { + return waitForState(rm, Arrays.asList(nm), containerId, containerState, + timeoutMsecs); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. + * + * @param nms array of mock nodemanagers + * @param containerId the id of a container + * @param containerState the container state waited + * @return if reach the state before timeout; false otherwise. + * @throws Exception if interrupted while waiting for the state transition + * or an unexpected error while MockNM is hearbeating. + */ + public static boolean waitForState(MockRM rm, Collection<MockNM> nms, ContainerId containerId, + RMContainerState containerState) throws Exception { + return waitForState(rm, nms, containerId, containerState, + TIMEOUT_MS_FOR_CONTAINER_AND_NODE); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is specified by the parameter. + * + * @param nms array of mock nodemanagers + * @param containerId the id of a container + * @param containerState the container state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @return if reach the state before timeout; false otherwise. + * @throws Exception if interrupted while waiting for the state transition + * or an unexpected error while MockNM is hearbeating. + */ + public static boolean waitForState(MockRM rm, Collection<MockNM> nms, ContainerId containerId, + RMContainerState containerState, int timeoutMsecs) throws Exception { + rm.drainEventsImplicitly(); + RMContainer container = rm.getResourceScheduler().getRMContainer(containerId); + int timeWaiting = 0; + while (container == null) { + if (timeWaiting >= timeoutMsecs) { + return false; + } + + for (MockNM nm : nms) { + nm.nodeHeartbeat(true); + } + rm.drainEventsImplicitly(); + container = rm.getResourceScheduler().getRMContainer(containerId); + LOG.info("Waiting for container " + containerId + " to be " + + containerState + ", container is null right now."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + while (!containerState.equals(container.getState())) { + if (timeWaiting >= timeoutMsecs) { + return false; + } + + LOG.info("Container : " + containerId + " State is : " + + container.getState() + " Waiting for state : " + containerState); + for (MockNM nm : nms) { + nm.nodeHeartbeat(true); + } + rm.drainEventsImplicitly(); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Container State is : " + container.getState()); + return true; + } + + /** + * Wait until a node has reached a specified state. + * The timeout is 20 seconds. + * + * @param nodeId the id of a node + * @param finalState the node state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, NodeId nodeId, NodeState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + RMNode node = rm.getRMNode(nodeId); + while (node == null) { + if (timeWaiting >= TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + break; + } + node = rm.getRMNode(nodeId); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + Assert.assertNotNull("node shouldn't be null (timedout)", node); + while (!finalState.equals(node.getState())) { + if (timeWaiting >= TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + break; + } + + LOG.info("Node State is : " + node.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Node " + nodeId + " State is : " + node.getState()); + Assert.assertEquals("Node state is not correct (timedout)", finalState, + node.getState()); + } + + @SuppressWarnings("rawtypes") + public static void waitForSchedulerAppAttemptAdded( + ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException { + int tick = 0; + rm.drainEventsImplicitly(); + // Wait for at most 5 sec + while (null == ((AbstractYarnScheduler) rm.getResourceScheduler()) + .getApplicationAttempt(attemptId) && tick < 50) { + Thread.sleep(100); + if (tick % 10 == 0) { + LOG.info("waiting for SchedulerApplicationAttempt=" + + attemptId + " added."); + } + tick++; + } + Assert.assertNotNull("Timed out waiting for SchedulerApplicationAttempt=" + + attemptId + " to be added.", ((AbstractYarnScheduler) + rm.getResourceScheduler()).getApplicationAttempt(attemptId)); + } + + public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) + throws Exception { + waitForState(rm, app.getApplicationId(), RMAppState.ACCEPTED); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); + waitForState(rm, attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + return attempt; + } + + /** + * Wait until an app removed from scheduler. + * The timeout is 40 seconds. + * @param appId the id of an app + * @throws InterruptedException + * if interrupted while waiting for app removed + */ + public static void waitForAppRemovedFromScheduler(MockRM rm, ApplicationId appId) + throws InterruptedException { + int timeWaiting = 0; + rm.drainEventsImplicitly(); + Map<ApplicationId, SchedulerApplication> apps = + ((AbstractYarnScheduler) rm.getResourceScheduler()) + .getSchedulerApplications(); + while (apps.containsKey(appId)) { + if (timeWaiting >= TIMEOUT_MS_FOR_APP_REMOVED) { + break; + } + LOG.info("wait for app removed, " + appId); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + Assert.assertTrue("app is not removed from scheduler (timeout).", + !apps.containsKey(appId)); + LOG.info("app is removed from scheduler, " + appId); + } + + /** + * Wait until a container has reached a completion state. + * @param rm A mock resourcemanager + * @param nm A mock nodemanager + * @param amContainerId A containerId for AM + * @param container Resourcemanager's view of an application container + */ + public static void waitforContainerCompletion(MockRM rm, MockNM nm, + ContainerId amContainerId, RMContainer container) throws Exception { + ContainerId containerId = container.getContainerId(); + if (null != rm.scheduler.getRMContainer(containerId)) { + if (containerId.equals(amContainerId)) { + CommonUtil.waitForState(rm, nm, containerId, RMContainerState.COMPLETED); + } else { + CommonUtil.waitForState(rm, nm, containerId, RMContainerState.KILLED); + } Review Comment: I think this can be replace with a ternary operator, so we can reduce the code here. ``` finalState = containerId.equals(amContainerId) ? RMContainerState.COMPLETED : RMContainerState.KILLED CommonUtil.waitForState(rm, nm, containerId, finalState) ``` ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + int timeWaiting = 0; + while (app.getAppAttempts().size() != attemptSize) { + if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) { + break; + } + LOG.info("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + return rm.launchAndRegisterAM(app, rm, nm); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. Review Comment: i think this is 20 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { + static final Logger LOG = LoggerFactory.getLogger(MockRM.class); + private static final int SECOND = 1000; + private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; + private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; + private static final int WAIT_MS_PER_LOOP = 10; + + /** + * Wait until an application has reached a specified state. + * The timeout is 80 seconds. + * + * @param appId the id of an application + * @param finalState the application state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationId appId, RMAppState finalState) + throws InterruptedException { + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalState.equals(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertEquals("App State is not correct (timeout).", finalState, + app.getState()); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(rm, attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attemptId the id of an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + private static void waitForState(MockRM rm, ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + rm.start(); + rm.drainEventsImplicitly(); + RMApp app = rm.getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + waitForState(attempt, finalState, timeoutMsecs); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout is 40 seconds. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState) throws InterruptedException { + waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); + } + + /** + * Wait until an attempt has reached a specified state. + * The timeout can be specified by the parameter. + * + * @param attempt an attempt + * @param finalState the attempt state waited + * @param timeoutMsecs the length of timeout in milliseconds + * @throws InterruptedException if interrupted while waiting for the state transition + */ + public static void waitForState(RMAppAttempt attempt, + RMAppAttemptState finalState, int timeoutMsecs) + throws InterruptedException { + int timeWaiting = 0; + while (finalState != attempt.getAppAttemptState()) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timeout).", finalState, + attempt.getState()); + } + + public static void waitForContainerToComplete(MockRM rm, RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + rm.drainEventsImplicitly(); + int timeWaiting = 0; + while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { + List<ContainerStatus> containers = attempt.getJustFinishedContainers(); + LOG.info("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + } + + public static MockAM waitForNewAMToLaunchAndRegister(MockRM rm, ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = rm.getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + int timeWaiting = 0; + while (app.getAppAttempts().size() != attemptSize) { + if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) { + break; + } + LOG.info("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + return rm.launchAndRegisterAM(app, rm, nm); + } + + /** + * Wait until a container has reached a specified state. + * The timeout is 10 seconds. Review Comment: maybe we can write this instead of hardcoded values `The timeout is {@link #TIMEOUT_MS_FOR_CONTAINER_AND_NODE} milliseconds.` ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java: ########## @@ -30,11 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.MockAM; -import org.apache.hadoop.yarn.server.resourcemanager.MockNM; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; -import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; +import org.apache.hadoop.yarn.server.resourcemanager.*; Review Comment: Can you please revert this import change? https://peterdev.pl/why-you-should-avoid-star-imports-in-intellij-idea/ ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/CommonUtil.java: ########## @@ -0,0 +1,410 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class CommonUtil { Review Comment: Because this is a public class, what can be seen from all of the test classes i think CommonUtil is too abstract name for this class. Maybe **ResorceManagerTesterUtils** can be a more informative name. > Document TestContainerResourceUsage#waitForContainerCompletion > -------------------------------------------------------------- > > Key: YARN-5607 > URL: https://issues.apache.org/jira/browse/YARN-5607 > Project: Hadoop YARN > Issue Type: Test > Components: resourcemanager, test > Affects Versions: 2.9.0 > Reporter: Karthik Kambatla > Assignee: Susheel Gupta > Priority: Major > Labels: newbie, pull-request-available > > The logic in TestContainerResourceUsage#waitForContainerCompletion > (introduced in YARN-5024) is not immediately obvious. It could use some > documentation. Also, this seems like a useful helper method. Should this be > moved to one of the mock classes or to a util class? -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org