http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index b4adf48..75ef5c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -33,8 +33,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -65,8 +63,7 @@ public class TestNodeBlacklistingOnAMFailures { conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -122,7 +119,7 @@ public class TestNodeBlacklistingOnAMFailures { // Try the current node a few times for (int i = 0; i <= 2; i++) { currentNode.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals( "AppAttemptState should still be SCHEDULED if currentNode is " @@ -132,7 +129,7 @@ public class TestNodeBlacklistingOnAMFailures { // Now try the other node otherNode.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -169,8 +166,7 @@ public class TestNodeBlacklistingOnAMFailures { conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -227,7 +223,7 @@ public class TestNodeBlacklistingOnAMFailures { System.out.println("New AppAttempt launched " + attempt.getAppAttemptId()); nm2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -257,8 +253,7 @@ public class TestNodeBlacklistingOnAMFailures { conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -319,7 +314,7 @@ public class TestNodeBlacklistingOnAMFailures { nm3.nodeHeartbeat(true); nm4.nodeHeartbeat(true); nm5.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -352,8 +347,7 @@ public class TestNodeBlacklistingOnAMFailures { 1.5f); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 100); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); MockNM node = new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); @@ -367,7 +361,7 @@ public class TestNodeBlacklistingOnAMFailures { // Now the AM container should be allocated RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); node.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -394,7 +388,7 @@ public class TestNodeBlacklistingOnAMFailures { .println("New AppAttempt launched " + attempt.getAppAttemptId()); node.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); @@ -418,20 +412,13 @@ public class TestNodeBlacklistingOnAMFailures { rm.waitForState(amAttemptID.getApplicationId(), RMAppState.ACCEPTED); } - private MockRM startRM(YarnConfiguration conf, - final DrainDispatcher dispatcher) { - + private MockRM startRM(YarnConfiguration conf) { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm = new MockRM(conf, memStore); - rm1.start(); - return rm1; + rm.start(); + return rm; } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java index 5a6fe67..f746dc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java @@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; @@ -186,9 +185,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase { rm.registerNode("127.0.0.1:1", memory, vCores); int attempts = 10; do { - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); - dispatcher.await(); + rm1.drainEvents(); rm.getRMContext().getReservationSystem() .synchronizePlan(ReservationSystemTestUtil.reservationQ, false); if (rm.getRMContext().getReservationSystem() http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index c8baa60..f9f0b74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -47,12 +45,10 @@ import org.junit.Test; public class TestAMRMRPCNodeUpdates { private MockRM rm; - ApplicationMasterService amService = null; - DrainDispatcher dispatcher = null; + private ApplicationMasterService amService; @Before public void setUp() { - dispatcher = new DrainDispatcher(); this.rm = new MockRM() { @Override public void init(Configuration conf) { @@ -61,12 +57,8 @@ public class TestAMRMRPCNodeUpdates { "1.0"); super.init(conf); } - - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } }; + rm.start(); amService = rm.getApplicationMasterService(); } @@ -80,14 +72,14 @@ public class TestAMRMRPCNodeUpdates { private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception { nm.nodeHeartbeat(health); - dispatcher.await(); + rm.drainEvents(); } private void syncNodeLost(MockNM nm) throws Exception { rm.sendNodeStarted(nm); rm.waitForState(nm.getNodeId(), NodeState.RUNNING); rm.sendNodeLost(nm); - dispatcher.await(); + rm.drainEvents(); } private AllocateResponse allocate(final ApplicationAttemptId attemptId, @@ -113,7 +105,7 @@ public class TestAMRMRPCNodeUpdates { MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000); MockNM nm3 = rm.registerNode("127.0.0.3:1234", 10000); MockNM nm4 = rm.registerNode("127.0.0.4:1234", 10000); - dispatcher.await(); + rm.drainEvents(); RMApp app1 = rm.submitApp(2000); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index e7c7e51..6a7325c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -228,21 +227,16 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { // The node(127.0.0.1:1234) reconnected with RM. When it registered with // RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But // the node's heartbeat come before RM succeeded setting the id to 0. - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = new MockRM(){ - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm = new MockRM(); rm.start(); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); int i = 0; while(i < 3) { nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); i++; } @@ -251,7 +245,7 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { nm2.registerNode(); RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); nm2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING, rmNode.getState()); rm.stop(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 893f802..db31448 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index ff52efd..fd17bd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -36,8 +36,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -612,24 +610,17 @@ public class TestApplicationPriority { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); - final DrainDispatcher dispatcher = new DrainDispatcher(); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm1 = new MockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); nm1.registerNode(); - - dispatcher.await(); + rm1.drainEvents(); ResourceScheduler scheduler = rm1.getRMContext().getScheduler(); LeafQueue defaultQueue = @@ -648,7 +639,7 @@ public class TestApplicationPriority { MockAM am2 = MockRM.launchAM(app2, rm1, nm1); am2.registerAppAttempt(); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); Assert.assertEquals(0, defaultQueue.getNumPendingApplications()); @@ -657,7 +648,7 @@ public class TestApplicationPriority { Priority appPriority3 = Priority.newInstance(7); RMApp app3 = rm1.submitApp(memory, appPriority3); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); @@ -676,14 +667,8 @@ public class TestApplicationPriority { Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), fcApp3.getApplicationAttemptId()); - final DrainDispatcher dispatcher1 = new DrainDispatcher(); // create new RM to represent restart and recover state - MockRM rm2 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher1; - } - }; + MockRM rm2 = new MockRM(conf, memStore); // start new RM rm2.start(); @@ -693,7 +678,7 @@ public class TestApplicationPriority { // Verify RM Apps after this restart Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); - dispatcher1.await(); + rm2.drainEvents(); scheduler = rm2.getRMContext().getScheduler(); defaultQueue = (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); @@ -714,7 +699,7 @@ public class TestApplicationPriority { // NM resync to new RM nm1.registerNode(); - dispatcher1.await(); + rm2.drainEvents(); // wait for activating applications count = 50; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index a451356..d4e7727 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -50,8 +50,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; @@ -199,7 +197,6 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { StartContainersResponse mockResponse = mock(StartContainersResponse.class); when(containerManager.startContainers((StartContainersRequest) any())) .thenReturn(mockResponse); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) { protected ClientRMService createClientRMService() { @@ -209,11 +206,6 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { }; @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - - @Override protected void doSecureLogin() throws IOException { } }; @@ -225,11 +217,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { // Set up a node. MockNM nm1 = rm.registerNode("localhost:1234", 3072); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); - nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = @@ -436,7 +427,6 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { StartContainersResponse mockResponse = mock(StartContainersResponse.class); when(containerManager.startContainers((StartContainersRequest) any())) .thenReturn(mockResponse); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) { protected ClientRMService createClientRMService() { @@ -446,11 +436,6 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { }; @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - - @Override protected void doSecureLogin() throws IOException { } }; @@ -462,10 +447,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { // Set up a node. MockNM nm1 = rm.registerNode("localhost:1234", 3072); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
