YARN-5375. invoke MockRM#drainEvents implicitly in MockRM methods to reduce 
test failures. Contributed by sandflee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6560351
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6560351
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6560351

Branch: refs/heads/HADOOP-13345
Commit: d65603517e52843f11cd9d3b6f6e28fca9336ee3
Parents: 61c0bed
Author: Rohith Sharma K S <rohithsharm...@apache.org>
Authored: Wed Nov 16 15:14:00 2016 +0530
Committer: Rohith Sharma K S <rohithsharm...@apache.org>
Committed: Wed Nov 16 15:14:00 2016 +0530

----------------------------------------------------------------------
 .../hadoop/yarn/event/DrainDispatcher.java      | 15 ++-
 .../resourcemanager/recovery/RMStateStore.java  | 24 +++--
 .../yarn/server/resourcemanager/MockRM.java     | 97 ++++++++++++++++++--
 .../resourcemanager/TestApplicationCleanup.java | 15 ---
 .../TestNodeBlacklistingOnAMFailures.java       | 14 ---
 .../yarn/server/resourcemanager/TestRM.java     |  8 +-
 .../server/resourcemanager/TestRMRestart.java   |  1 +
 .../TestAMRMRPCNodeUpdates.java                 | 13 ---
 .../scheduler/TestAbstractYarnScheduler.java    |  2 +-
 .../scheduler/fair/TestFairScheduler.java       |  7 ++
 .../webapp/TestRMWebServicesNodes.java          |  1 +
 11 files changed, 134 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index f769492..1369465 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -17,6 +17,8 @@
 */
 package org.apache.hadoop.yarn.event;
 
+import org.apache.hadoop.conf.Configuration;
+
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -37,6 +39,13 @@ public class DrainDispatcher extends AsyncDispatcher {
     this.mutex = this;
   }
 
+  @Override
+  public void serviceInit(Configuration conf)
+      throws Exception {
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
+    super.serviceInit(conf);
+  }
+
   /**
    *  Wait till event thread enters WAITING state (i.e. waiting for new 
events).
    */
@@ -50,7 +59,7 @@ public class DrainDispatcher extends AsyncDispatcher {
    * Busy loop waiting for all queued events to drain.
    */
   public void await() {
-    while (!drained) {
+    while (!isDrained()) {
       Thread.yield();
     }
   }
@@ -96,7 +105,9 @@ public class DrainDispatcher extends AsyncDispatcher {
 
   @Override
   protected boolean isDrained() {
-    return drained;
+    synchronized (mutex) {
+      return drained;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index a6527d8..0fd346f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -671,14 +671,18 @@ public abstract class RMStateStore extends 
AbstractService {
   }
   
   AsyncDispatcher dispatcher;
+  @SuppressWarnings("rawtypes")
+  @VisibleForTesting
+  protected EventHandler rmStateStoreEventHandler;
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception{
     // create async handler
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
+    rmStateStoreEventHandler = new ForwardingEventHandler();
     dispatcher.register(RMStateStoreEventType.class, 
-                        new ForwardingEventHandler());
+                        rmStateStoreEventHandler);
     dispatcher.setDrainEventsOnStop();
     initInternal(conf);
   }
@@ -790,12 +794,12 @@ public abstract class RMStateStore extends 
AbstractService {
         ApplicationStateData.newInstance(app.getSubmitTime(),
             app.getStartTime(), context, app.getUser(), 
app.getCallerContext());
     appState.setApplicationTimeouts(app.getApplicationTimeouts());
-    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
+    getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
   }
 
   @SuppressWarnings("unchecked")
   public void updateApplicationState(ApplicationStateData appState) {
-    dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
+    getRMStateStoreEventHandler().handle(new RMStateUpdateAppEvent(appState));
   }
 
   public void updateApplicationStateSynchronously(ApplicationStateData 
appState,
@@ -842,14 +846,14 @@ public abstract class RMStateStore extends 
AbstractService {
             attempMetrics.getPreemptedVcore()
             );
 
-    dispatcher.getEventHandler().handle(
+    getRMStateStoreEventHandler().handle(
       new RMStateStoreAppAttemptEvent(attemptState));
   }
 
   @SuppressWarnings("unchecked")
   public void updateApplicationAttemptState(
       ApplicationAttemptStateData attemptState) {
-    dispatcher.getEventHandler().handle(
+    getRMStateStoreEventHandler().handle(
       new RMStateUpdateAppAttemptEvent(attemptState));
   }
 
@@ -1021,7 +1025,8 @@ public abstract class RMStateStore extends 
AbstractService {
       appState.attempts.put(appAttempt.getAppAttemptId(), null);
     }
     
-    dispatcher.getEventHandler().handle(new 
RMStateStoreRemoveAppEvent(appState));
+    getRMStateStoreEventHandler().handle(
+        new RMStateStoreRemoveAppEvent(appState));
   }
 
   /**
@@ -1042,7 +1047,7 @@ public abstract class RMStateStore extends 
AbstractService {
   @SuppressWarnings("unchecked")
   public synchronized void removeApplicationAttempt(
       ApplicationAttemptId applicationAttemptId) {
-    dispatcher.getEventHandler().handle(
+    getRMStateStoreEventHandler().handle(
         new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId));
   }
 
@@ -1211,4 +1216,9 @@ public abstract class RMStateStore extends 
AbstractService {
       this.readLock.unlock();
     }
   }
+
+  @SuppressWarnings("rawtypes")
+  protected EventHandler getRMStateStoreEventHandler() {
+    return dispatcher.getEventHandler();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 3861624..ea573e2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -67,6 +67,7 @@ import 
org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -74,6 +75,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -93,6 +96,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -117,6 +121,7 @@ public class MockRM extends ResourceManager {
   private static final int WAIT_MS_PER_LOOP = 10;
 
   private final boolean useNullRMNodeLabelsManager;
+  private boolean disableDrainEventsImplicitly;
 
   public MockRM() {
     this(new YarnConfiguration());
@@ -135,13 +140,41 @@ public class MockRM extends ResourceManager {
     super();
     this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
     init(conf instanceof YarnConfiguration ? conf : new 
YarnConfiguration(conf));
-    if(store != null) {
+    if (store != null) {
       setRMStateStore(store);
+    } else {
+      Class storeClass = getRMContext().getStateStore().getClass();
+      if (storeClass.equals(MemoryRMStateStore.class)) {
+        MockRMMemoryStateStore mockStateStore = new MockRMMemoryStateStore();
+        mockStateStore.init(conf);
+        setRMStateStore(mockStateStore);
+      } else if (storeClass.equals(NullRMStateStore.class)) {
+        MockRMNullStateStore mockStateStore = new MockRMNullStateStore();
+        mockStateStore.init(conf);
+        setRMStateStore(mockStateStore);
+      }
     }
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
+    disableDrainEventsImplicitly = false;
   }
-  
+
+  public class MockRMMemoryStateStore extends MemoryRMStateStore {
+    @SuppressWarnings("rawtypes")
+    @Override
+    protected EventHandler getRMStateStoreEventHandler() {
+      return rmStateStoreEventHandler;
+    }
+  }
+
+  public class MockRMNullStateStore extends NullRMStateStore {
+    @SuppressWarnings("rawtypes")
+    @Override
+    protected EventHandler getRMStateStoreEventHandler() {
+      return rmStateStoreEventHandler;
+    }
+  }
+
   @Override
   protected RMNodeLabelsManager createNodeLabelManager()
       throws InstantiationException, IllegalAccessException {
@@ -159,6 +192,16 @@ public class MockRM extends ResourceManager {
     return new DrainDispatcher();
   }
 
+  @Override
+  protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+    return new EventHandler<SchedulerEvent>() {
+      @Override
+      public void handle(SchedulerEvent event) {
+        scheduler.handle(event);
+      }
+    };
+  }
+
   public void drainEvents() {
     Dispatcher rmDispatcher = getRmDispatcher();
     if (rmDispatcher instanceof DrainDispatcher) {
@@ -170,6 +213,7 @@ public class MockRM extends ResourceManager {
 
   private void waitForState(ApplicationId appId, EnumSet<RMAppState> 
finalStates)
       throws InterruptedException {
+    drainEventsImplicitly();
     RMApp app = getRMContext().getRMApps().get(appId);
     Assert.assertNotNull("app shouldn't be null", app);
     final int timeoutMsecs = 80 * SECOND;
@@ -200,6 +244,7 @@ public class MockRM extends ResourceManager {
    */
   public void waitForState(ApplicationId appId, RMAppState finalState)
       throws InterruptedException {
+    drainEventsImplicitly();
     RMApp app = getRMContext().getRMApps().get(appId);
     Assert.assertNotNull("app shouldn't be null", app);
     final int timeoutMsecs = 80 * SECOND;
@@ -245,6 +290,7 @@ public class MockRM extends ResourceManager {
   public void waitForState(ApplicationAttemptId attemptId,
       RMAppAttemptState finalState, int timeoutMsecs)
       throws InterruptedException {
+    drainEventsImplicitly();
     RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
     Assert.assertNotNull("app shouldn't be null", app);
     RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
@@ -295,6 +341,7 @@ public class MockRM extends ResourceManager {
 
   public void waitForContainerToComplete(RMAppAttempt attempt,
       NMContainerStatus completedContainer) throws InterruptedException {
+    drainEventsImplicitly();
     int timeWaiting = 0;
     while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) {
       List<ContainerStatus> containers = attempt.getJustFinishedContainers();
@@ -394,6 +441,7 @@ public class MockRM extends ResourceManager {
    */
   public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
       RMContainerState containerState, int timeoutMsecs) throws Exception {
+    drainEventsImplicitly();
     RMContainer container = getResourceScheduler().getRMContainer(containerId);
     int timeWaiting = 0;
     while (container == null) {
@@ -404,6 +452,7 @@ public class MockRM extends ResourceManager {
       for (MockNM nm : nms) {
         nm.nodeHeartbeat(true);
       }
+      drainEventsImplicitly();
       container = getResourceScheduler().getRMContainer(containerId);
       LOG.info("Waiting for container " + containerId + " to be "
           + containerState + ", container is null right now.");
@@ -421,6 +470,7 @@ public class MockRM extends ResourceManager {
       for (MockNM nm : nms) {
         nm.nodeHeartbeat(true);
       }
+      drainEventsImplicitly();
       Thread.sleep(WAIT_MS_PER_LOOP);
       timeWaiting += WAIT_MS_PER_LOOP;
     }
@@ -698,7 +748,7 @@ public class MockRM extends ResourceManager {
   public MockNM registerNode(String nodeIdStr, int memory) throws Exception {
     MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService());
     nm.registerNode();
-    drainEvents();
+    drainEventsImplicitly();
     return nm;
   }
 
@@ -707,7 +757,7 @@ public class MockRM extends ResourceManager {
     MockNM nm =
         new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService());
     nm.registerNode();
-    drainEvents();
+    drainEventsImplicitly();
     return nm;
   }
   
@@ -717,7 +767,7 @@ public class MockRM extends ResourceManager {
         new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(),
             YarnVersionInfo.getVersion());
     nm.registerNode(runningApplications);
-    drainEvents();
+    drainEventsImplicitly();
     return nm;
   }
 
@@ -725,12 +775,14 @@ public class MockRM extends ResourceManager {
     RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
         nm.getNodeId());
     node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null));
+    drainEventsImplicitly();
   }
   
   public void sendNodeLost(MockNM nm) throws Exception {
     RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
         nm.getNodeId());
     node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
+    drainEventsImplicitly();
   }
 
   /**
@@ -743,6 +795,7 @@ public class MockRM extends ResourceManager {
    */
   public void waitForState(NodeId nodeId, NodeState finalState)
       throws InterruptedException {
+    drainEventsImplicitly();
     RMNode node = getRMContext().getRMNodes().get(nodeId);
     if (node == null) {
       node = getRMContext().getInactiveRMNodes().get(nodeId);
@@ -774,7 +827,9 @@ public class MockRM extends ResourceManager {
   public KillApplicationResponse killApp(ApplicationId appId) throws Exception 
{
     ApplicationClientProtocol client = getClientRMService();
     KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
-    return client.forceKillApplication(req);
+    KillApplicationResponse response = client.forceKillApplication(req);
+    drainEventsImplicitly();
+    return response;
   }
 
   public FailApplicationAttemptResponse failApplicationAttempt(
@@ -782,7 +837,10 @@ public class MockRM extends ResourceManager {
     ApplicationClientProtocol client = getClientRMService();
     FailApplicationAttemptRequest req =
         FailApplicationAttemptRequest.newInstance(attemptId);
-    return client.failApplicationAttempt(req);
+    FailApplicationAttemptResponse response =
+        client.failApplicationAttempt(req);
+    drainEventsImplicitly();
+    return response;
   }
 
   /**
@@ -807,6 +865,7 @@ public class MockRM extends ResourceManager {
         .getEventHandler()
         .handle(
             new RMAppAttemptEvent(appAttemptId, 
RMAppAttemptEventType.LAUNCHED));
+    drainEventsImplicitly();
     return am;
   }
 
@@ -817,6 +876,7 @@ public class MockRM extends ResourceManager {
     getRMContext().getDispatcher().getEventHandler()
         .handle(new RMAppAttemptEvent(appAttemptId,
             RMAppAttemptEventType.LAUNCH_FAILED, "Failed"));
+    drainEventsImplicitly();
   }
 
   @Override
@@ -966,6 +1026,7 @@ public class MockRM extends ResourceManager {
     am.unregisterAppAttempt(req,true);
     rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
     nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm.drainEventsImplicitly();
     rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
     rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
   }
@@ -974,6 +1035,7 @@ public class MockRM extends ResourceManager {
   private static void waitForSchedulerAppAttemptAdded(
       ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException {
     int tick = 0;
+    rm.drainEventsImplicitly();
     // Wait for at most 5 sec
     while (null == ((AbstractYarnScheduler) rm.getResourceScheduler())
         .getApplicationAttempt(attemptId) && tick < 50) {
@@ -1015,9 +1077,11 @@ public class MockRM extends ResourceManager {
    */
   public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
       throws Exception {
+    rm.drainEventsImplicitly();
     RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
     LOG.info("Launch AM " + attempt.getAppAttemptId());
     nm.nodeHeartbeat(true);
+    rm.drainEventsImplicitly();
     MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
     rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
     return am;
@@ -1025,12 +1089,14 @@ public class MockRM extends ResourceManager {
 
   public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm)
       throws Exception {
+    rm.drainEventsImplicitly();
     // UAMs go directly to LAUNCHED state
     rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
     LOG.info("Launch AM " + attempt.getAppAttemptId());
     nm.nodeHeartbeat(true);
+    rm.drainEventsImplicitly();
     MockAM am = new MockAM(rm.getRMContext(), rm.masterService,
         attempt.getAppAttemptId());
     rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
@@ -1067,6 +1133,7 @@ public class MockRM extends ResourceManager {
       throws IOException, YarnException {
     ApplicationClientProtocol client = getClientRMService();
     client.updateReservation(request);
+    drainEventsImplicitly();
   }
 
   // Explicitly reset queue metrics for testing.
@@ -1087,6 +1154,7 @@ public class MockRM extends ResourceManager {
     SignalContainerRequest req =
         SignalContainerRequest.newInstance(containerId, command);
     client.signalToContainer(req);
+    drainEventsImplicitly();
   }
 
   /**
@@ -1099,6 +1167,7 @@ public class MockRM extends ResourceManager {
   public void waitForAppRemovedFromScheduler(ApplicationId appId)
       throws InterruptedException {
     int timeWaiting = 0;
+    drainEventsImplicitly();
 
     Map<ApplicationId, SchedulerApplication> apps  =
         ((AbstractYarnScheduler) getResourceScheduler())
@@ -1116,6 +1185,20 @@ public class MockRM extends ResourceManager {
     LOG.info("app is removed from scheduler, " + appId);
   }
 
+  private void drainEventsImplicitly() {
+    if (!disableDrainEventsImplicitly) {
+      drainEvents();
+    }
+  }
+
+  public void disableDrainEventsImplicitly() {
+    disableDrainEventsImplicitly = true;
+  }
+
+  public void enableDrainEventsImplicityly() {
+    disableDrainEventsImplicitly = false;
+  }
+
   public RMApp submitApp(int masterMemory, Priority priority,
       Map<ApplicationTimeoutType, Long> applicationTimeouts) throws Exception {
     Resource resource = Resource.newInstance(masterMemory, 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index 7c02264..c4197a1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -42,9 +42,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -53,7 +50,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -168,17 +164,6 @@ public class TestApplicationCleanup {
     final DrainDispatcher dispatcher = new DrainDispatcher();
     MockRM rm = new MockRM() {
       @Override
-      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new EventDispatcher<SchedulerEvent>(this.scheduler,
-            this.scheduler.getClass().getName()) {
-          @Override
-          public void handle(SchedulerEvent event) {
-            super.handle(event);
-          }
-        };
-      }
-
-      @Override
       protected Dispatcher createDispatcher() {
         return dispatcher;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
index 7a24b7a..c80a799 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
-import org.apache.hadoop.yarn.event.EventDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -43,7 +41,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
@@ -245,17 +242,6 @@ public class TestNodeBlacklistingOnAMFailures {
 
     MockRM rm1 = new MockRM(conf, memStore) {
       @Override
-      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new EventDispatcher<SchedulerEvent>(this.scheduler,
-            this.scheduler.getClass().getName()) {
-          @Override
-          public void handle(SchedulerEvent event) {
-            super.handle(event);
-          }
-        };
-      }
-
-      @Override
       protected Dispatcher createDispatcher() {
         return dispatcher;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
index 61fd884..d84c77d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.junit.Before;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doNothing;
@@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -559,7 +559,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
   @Test (timeout = 60000)
   public void testApplicationKillAtAcceptedState() throws Exception {
 
-    final Dispatcher dispatcher = new AsyncDispatcher() {
+    final Dispatcher dispatcher = new DrainDispatcher() {
       @Override
       public EventHandler getEventHandler() {
 
@@ -640,7 +640,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
   public void testKillFinishingApp() throws Exception{
 
     // this dispatcher ignores RMAppAttemptEventType.KILL event
-    final Dispatcher dispatcher = new AsyncDispatcher() {
+    final Dispatcher dispatcher = new DrainDispatcher() {
       @Override
       public EventHandler getEventHandler() {
 
@@ -694,7 +694,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
   public void testKillFailingApp() throws Exception{
 
     // this dispatcher ignores RMAppAttemptEventType.KILL event
-    final Dispatcher dispatcher = new AsyncDispatcher() {
+    final Dispatcher dispatcher = new DrainDispatcher() {
       @Override
       public EventHandler getEventHandler() {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index a98a124..9223ef3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -1577,6 +1577,7 @@ public class TestRMRestart extends 
ParameterizedSchedulerTestBase {
 
     // start RM
     final MockRM rm1 = createMockRM(conf, memStore);
+    rm1.disableDrainEventsImplicitly();
     rm1.start();
 
     // create apps.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index 4329033..c8baa60 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -21,7 +21,6 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
 
-import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -34,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -42,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.After;
 import org.junit.Before;
@@ -64,16 +61,6 @@ public class TestAMRMRPCNodeUpdates {
           "1.0");
         super.init(conf);
       }
-      @Override
-      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new EventDispatcher<SchedulerEvent>(this.scheduler,
-            this.scheduler.getClass().getName()) {
-          @Override
-          public void handle(SchedulerEvent event) {
-            super.handle(event);
-          }
-        };
-      }
 
       @Override
       protected Dispatcher createDispatcher() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index 3c3d878..6395339 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -574,7 +574,7 @@ public class TestAbstractYarnScheduler extends 
ParameterizedSchedulerTestBase {
 
       // AM crashes, and a new app-attempt gets created
       node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE);
-      rm.waitForState(node, am1ContainerID, RMContainerState.COMPLETED, 30 * 
1000);
+      rm.drainEvents();
       RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm);
       ApplicationAttemptId applicationAttemptTwoID =
           rmAppAttempt2.getAppAttemptId();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 21d22c3..ffbfec8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -4620,6 +4620,13 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
         new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
             containerManagerPort, httpPort, rackName, capability,
             resourceManager);
+
+    // after YARN-5375, scheduler event is processed in rm main dispatcher,
+    // wait it processed, or may lead dead lock
+    if (resourceManager instanceof MockRM) {
+      ((MockRM) resourceManager).drainEvents();
+    }
+
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
             .get(nm.getNodeId()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6560351/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index 3ff8f6a..10aa92a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -90,6 +90,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
       rm = new MockRM(new Configuration());
       rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
       rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
+      rm.disableDrainEventsImplicitly();
       bind(ResourceManager.class).toInstance(rm);
       serve("/*").with(GuiceContainer.class);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to