YARN-5375. invoke MockRM#drainEvents implicitly in MockRM methods to reduce test failures. Contributed by sandflee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6560351 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6560351 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6560351 Branch: refs/heads/HADOOP-13345 Commit: d65603517e52843f11cd9d3b6f6e28fca9336ee3 Parents: 61c0bed Author: Rohith Sharma K S <rohithsharm...@apache.org> Authored: Wed Nov 16 15:14:00 2016 +0530 Committer: Rohith Sharma K S <rohithsharm...@apache.org> Committed: Wed Nov 16 15:14:00 2016 +0530 ---------------------------------------------------------------------- .../hadoop/yarn/event/DrainDispatcher.java | 15 ++- .../resourcemanager/recovery/RMStateStore.java | 24 +++-- .../yarn/server/resourcemanager/MockRM.java | 97 ++++++++++++++++++-- .../resourcemanager/TestApplicationCleanup.java | 15 --- .../TestNodeBlacklistingOnAMFailures.java | 14 --- .../yarn/server/resourcemanager/TestRM.java | 8 +- .../server/resourcemanager/TestRMRestart.java | 1 + .../TestAMRMRPCNodeUpdates.java | 13 --- .../scheduler/TestAbstractYarnScheduler.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 7 ++ .../webapp/TestRMWebServicesNodes.java | 1 + 11 files changed, 134 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index f769492..1369465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.event; +import org.apache.hadoop.conf.Configuration; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -37,6 +39,13 @@ public class DrainDispatcher extends AsyncDispatcher { this.mutex = this; } + @Override + public void serviceInit(Configuration conf) + throws Exception { + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false); + super.serviceInit(conf); + } + /** * Wait till event thread enters WAITING state (i.e. waiting for new events). */ @@ -50,7 +59,7 @@ public class DrainDispatcher extends AsyncDispatcher { * Busy loop waiting for all queued events to drain. */ public void await() { - while (!drained) { + while (!isDrained()) { Thread.yield(); } } @@ -96,7 +105,9 @@ public class DrainDispatcher extends AsyncDispatcher { @Override protected boolean isDrained() { - return drained; + synchronized (mutex) { + return drained; + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index a6527d8..0fd346f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -671,14 +671,18 @@ public abstract class RMStateStore extends AbstractService { } AsyncDispatcher dispatcher; + @SuppressWarnings("rawtypes") + @VisibleForTesting + protected EventHandler rmStateStoreEventHandler; @Override protected void serviceInit(Configuration conf) throws Exception{ // create async handler dispatcher = new AsyncDispatcher(); dispatcher.init(conf); + rmStateStoreEventHandler = new ForwardingEventHandler(); dispatcher.register(RMStateStoreEventType.class, - new ForwardingEventHandler()); + rmStateStoreEventHandler); dispatcher.setDrainEventsOnStop(); initInternal(conf); } @@ -790,12 +794,12 @@ public abstract class RMStateStore extends AbstractService { ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), context, app.getUser(), app.getCallerContext()); appState.setApplicationTimeouts(app.getApplicationTimeouts()); - dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); + getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") public void updateApplicationState(ApplicationStateData appState) { - dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); + getRMStateStoreEventHandler().handle(new RMStateUpdateAppEvent(appState)); } public void updateApplicationStateSynchronously(ApplicationStateData appState, @@ -842,14 +846,14 @@ public abstract class RMStateStore extends AbstractService { attempMetrics.getPreemptedVcore() ); - dispatcher.getEventHandler().handle( + getRMStateStoreEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); } @SuppressWarnings("unchecked") public void updateApplicationAttemptState( ApplicationAttemptStateData attemptState) { - dispatcher.getEventHandler().handle( + getRMStateStoreEventHandler().handle( new RMStateUpdateAppAttemptEvent(attemptState)); } @@ -1021,7 +1025,8 @@ public abstract class RMStateStore extends AbstractService { appState.attempts.put(appAttempt.getAppAttemptId(), null); } - dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); + getRMStateStoreEventHandler().handle( + new RMStateStoreRemoveAppEvent(appState)); } /** @@ -1042,7 +1047,7 @@ public abstract class RMStateStore extends AbstractService { @SuppressWarnings("unchecked") public synchronized void removeApplicationAttempt( ApplicationAttemptId applicationAttemptId) { - dispatcher.getEventHandler().handle( + getRMStateStoreEventHandler().handle( new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId)); } @@ -1211,4 +1216,9 @@ public abstract class RMStateStore extends AbstractService { this.readLock.unlock(); } } + + @SuppressWarnings("rawtypes") + protected EventHandler getRMStateStoreEventHandler() { + return dispatcher.getEventHandler(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3861624..ea573e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -74,6 +75,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -93,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -117,6 +121,7 @@ public class MockRM extends ResourceManager { private static final int WAIT_MS_PER_LOOP = 10; private final boolean useNullRMNodeLabelsManager; + private boolean disableDrainEventsImplicitly; public MockRM() { this(new YarnConfiguration()); @@ -135,13 +140,41 @@ public class MockRM extends ResourceManager { super(); this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); - if(store != null) { + if (store != null) { setRMStateStore(store); + } else { + Class storeClass = getRMContext().getStateStore().getClass(); + if (storeClass.equals(MemoryRMStateStore.class)) { + MockRMMemoryStateStore mockStateStore = new MockRMMemoryStateStore(); + mockStateStore.init(conf); + setRMStateStore(mockStateStore); + } else if (storeClass.equals(NullRMStateStore.class)) { + MockRMNullStateStore mockStateStore = new MockRMNullStateStore(); + mockStateStore.init(conf); + setRMStateStore(mockStateStore); + } } Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); + disableDrainEventsImplicitly = false; } - + + public class MockRMMemoryStateStore extends MemoryRMStateStore { + @SuppressWarnings("rawtypes") + @Override + protected EventHandler getRMStateStoreEventHandler() { + return rmStateStoreEventHandler; + } + } + + public class MockRMNullStateStore extends NullRMStateStore { + @SuppressWarnings("rawtypes") + @Override + protected EventHandler getRMStateStoreEventHandler() { + return rmStateStoreEventHandler; + } + } + @Override protected RMNodeLabelsManager createNodeLabelManager() throws InstantiationException, IllegalAccessException { @@ -159,6 +192,16 @@ public class MockRM extends ResourceManager { return new DrainDispatcher(); } + @Override + protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { + return new EventHandler<SchedulerEvent>() { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + public void drainEvents() { Dispatcher rmDispatcher = getRmDispatcher(); if (rmDispatcher instanceof DrainDispatcher) { @@ -170,6 +213,7 @@ public class MockRM extends ResourceManager { private void waitForState(ApplicationId appId, EnumSet<RMAppState> finalStates) throws InterruptedException { + drainEventsImplicitly(); RMApp app = getRMContext().getRMApps().get(appId); Assert.assertNotNull("app shouldn't be null", app); final int timeoutMsecs = 80 * SECOND; @@ -200,6 +244,7 @@ public class MockRM extends ResourceManager { */ public void waitForState(ApplicationId appId, RMAppState finalState) throws InterruptedException { + drainEventsImplicitly(); RMApp app = getRMContext().getRMApps().get(appId); Assert.assertNotNull("app shouldn't be null", app); final int timeoutMsecs = 80 * SECOND; @@ -245,6 +290,7 @@ public class MockRM extends ResourceManager { public void waitForState(ApplicationAttemptId attemptId, RMAppAttemptState finalState, int timeoutMsecs) throws InterruptedException { + drainEventsImplicitly(); RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); Assert.assertNotNull("app shouldn't be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); @@ -295,6 +341,7 @@ public class MockRM extends ResourceManager { public void waitForContainerToComplete(RMAppAttempt attempt, NMContainerStatus completedContainer) throws InterruptedException { + drainEventsImplicitly(); int timeWaiting = 0; while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { List<ContainerStatus> containers = attempt.getJustFinishedContainers(); @@ -394,6 +441,7 @@ public class MockRM extends ResourceManager { */ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId, RMContainerState containerState, int timeoutMsecs) throws Exception { + drainEventsImplicitly(); RMContainer container = getResourceScheduler().getRMContainer(containerId); int timeWaiting = 0; while (container == null) { @@ -404,6 +452,7 @@ public class MockRM extends ResourceManager { for (MockNM nm : nms) { nm.nodeHeartbeat(true); } + drainEventsImplicitly(); container = getResourceScheduler().getRMContainer(containerId); LOG.info("Waiting for container " + containerId + " to be " + containerState + ", container is null right now."); @@ -421,6 +470,7 @@ public class MockRM extends ResourceManager { for (MockNM nm : nms) { nm.nodeHeartbeat(true); } + drainEventsImplicitly(); Thread.sleep(WAIT_MS_PER_LOOP); timeWaiting += WAIT_MS_PER_LOOP; } @@ -698,7 +748,7 @@ public class MockRM extends ResourceManager { public MockNM registerNode(String nodeIdStr, int memory) throws Exception { MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -707,7 +757,7 @@ public class MockRM extends ResourceManager { MockNM nm = new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -717,7 +767,7 @@ public class MockRM extends ResourceManager { new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), YarnVersionInfo.getVersion()); nm.registerNode(runningApplications); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -725,12 +775,14 @@ public class MockRM extends ResourceManager { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + drainEventsImplicitly(); } public void sendNodeLost(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); + drainEventsImplicitly(); } /** @@ -743,6 +795,7 @@ public class MockRM extends ResourceManager { */ public void waitForState(NodeId nodeId, NodeState finalState) throws InterruptedException { + drainEventsImplicitly(); RMNode node = getRMContext().getRMNodes().get(nodeId); if (node == null) { node = getRMContext().getInactiveRMNodes().get(nodeId); @@ -774,7 +827,9 @@ public class MockRM extends ResourceManager { public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); KillApplicationRequest req = KillApplicationRequest.newInstance(appId); - return client.forceKillApplication(req); + KillApplicationResponse response = client.forceKillApplication(req); + drainEventsImplicitly(); + return response; } public FailApplicationAttemptResponse failApplicationAttempt( @@ -782,7 +837,10 @@ public class MockRM extends ResourceManager { ApplicationClientProtocol client = getClientRMService(); FailApplicationAttemptRequest req = FailApplicationAttemptRequest.newInstance(attemptId); - return client.failApplicationAttempt(req); + FailApplicationAttemptResponse response = + client.failApplicationAttempt(req); + drainEventsImplicitly(); + return response; } /** @@ -807,6 +865,7 @@ public class MockRM extends ResourceManager { .getEventHandler() .handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED)); + drainEventsImplicitly(); return am; } @@ -817,6 +876,7 @@ public class MockRM extends ResourceManager { getRMContext().getDispatcher().getEventHandler() .handle(new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCH_FAILED, "Failed")); + drainEventsImplicitly(); } @Override @@ -966,6 +1026,7 @@ public class MockRM extends ResourceManager { am.unregisterAppAttempt(req,true); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING); nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm.drainEventsImplicitly(); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); } @@ -974,6 +1035,7 @@ public class MockRM extends ResourceManager { private static void waitForSchedulerAppAttemptAdded( ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException { int tick = 0; + rm.drainEventsImplicitly(); // Wait for at most 5 sec while (null == ((AbstractYarnScheduler) rm.getResourceScheduler()) .getApplicationAttempt(attemptId) && tick < 50) { @@ -1015,9 +1077,11 @@ public class MockRM extends ResourceManager { */ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEventsImplicitly(); RMAppAttempt attempt = waitForAttemptScheduled(app, rm); LOG.info("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); + rm.drainEventsImplicitly(); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); return am; @@ -1025,12 +1089,14 @@ public class MockRM extends ResourceManager { public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEventsImplicitly(); // UAMs go directly to LAUNCHED state rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); LOG.info("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); + rm.drainEventsImplicitly(); MockAM am = new MockAM(rm.getRMContext(), rm.masterService, attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -1067,6 +1133,7 @@ public class MockRM extends ResourceManager { throws IOException, YarnException { ApplicationClientProtocol client = getClientRMService(); client.updateReservation(request); + drainEventsImplicitly(); } // Explicitly reset queue metrics for testing. @@ -1087,6 +1154,7 @@ public class MockRM extends ResourceManager { SignalContainerRequest req = SignalContainerRequest.newInstance(containerId, command); client.signalToContainer(req); + drainEventsImplicitly(); } /** @@ -1099,6 +1167,7 @@ public class MockRM extends ResourceManager { public void waitForAppRemovedFromScheduler(ApplicationId appId) throws InterruptedException { int timeWaiting = 0; + drainEventsImplicitly(); Map<ApplicationId, SchedulerApplication> apps = ((AbstractYarnScheduler) getResourceScheduler()) @@ -1116,6 +1185,20 @@ public class MockRM extends ResourceManager { LOG.info("app is removed from scheduler, " + appId); } + private void drainEventsImplicitly() { + if (!disableDrainEventsImplicitly) { + drainEvents(); + } + } + + public void disableDrainEventsImplicitly() { + disableDrainEventsImplicitly = true; + } + + public void enableDrainEventsImplicityly() { + disableDrainEventsImplicitly = false; + } + public RMApp submitApp(int masterMemory, Priority priority, Map<ApplicationTimeoutType, Long> applicationTimeouts) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 7c02264..c4197a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -42,9 +42,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -53,7 +50,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -168,17 +164,6 @@ public class TestApplicationCleanup { final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRM() { @Override - protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new EventDispatcher<SchedulerEvent>(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index 7a24b7a..c80a799 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -245,17 +242,6 @@ public class TestNodeBlacklistingOnAMFailures { MockRM rm1 = new MockRM(conf, memStore) { @Override - protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new EventDispatcher<SchedulerEvent>(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 61fd884..d84c77d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; @@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -559,7 +559,7 @@ public class TestRM extends ParameterizedSchedulerTestBase { @Test (timeout = 60000) public void testApplicationKillAtAcceptedState() throws Exception { - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -640,7 +640,7 @@ public class TestRM extends ParameterizedSchedulerTestBase { public void testKillFinishingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -694,7 +694,7 @@ public class TestRM extends ParameterizedSchedulerTestBase { public void testKillFailingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index a98a124..9223ef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1577,6 +1577,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // start RM final MockRM rm1 = createMockRM(conf, memStore); + rm1.disableDrainEventsImplicitly(); rm1.start(); // create apps. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index 4329033..c8baa60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; import java.security.PrivilegedExceptionAction; import java.util.List; -import org.apache.hadoop.yarn.event.EventDispatcher; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -34,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -42,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; @@ -64,16 +61,6 @@ public class TestAMRMRPCNodeUpdates { "1.0"); super.init(conf); } - @Override - protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new EventDispatcher<SchedulerEvent>(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } @Override protected Dispatcher createDispatcher() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 3c3d878..6395339 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -574,7 +574,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { // AM crashes, and a new app-attempt gets created node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE); - rm.waitForState(node, am1ContainerID, RMContainerState.COMPLETED, 30 * 1000); + rm.drainEvents(); RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm); ApplicationAttemptId applicationAttemptTwoID = rmAppAttempt2.getAppAttemptId(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 21d22c3..ffbfec8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -4620,6 +4620,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, containerManagerPort, httpPort, rackName, capability, resourceManager); + + // after YARN-5375, scheduler event is processed in rm main dispatcher, + // wait it processed, or may lead dead lock + if (resourceManager instanceof MockRM) { + ((MockRM) resourceManager).drainEvents(); + } + NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index 3ff8f6a..10aa92a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -90,6 +90,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase { rm = new MockRM(new Configuration()); rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + rm.disableDrainEventsImplicitly(); bind(ResourceManager.class).toInstance(rm); serve("/*").with(GuiceContainer.class); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org