YARN-8051. TestRMEmbeddedElector#testCallbackSynchronization is flaky. (Robert 
Kanter via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/93d47a0e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/93d47a0e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/93d47a0e

Branch: refs/heads/HDFS-7240
Commit: 93d47a0ed504ee81d4b74d340c1815bdbb3c9b14
Parents: 2be64eb
Author: Haibo Chen <haiboc...@apache.org>
Authored: Tue Apr 3 07:58:21 2018 -0700
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Tue Apr 3 07:59:20 2018 -0700

----------------------------------------------------------------------
 .../resourcemanager/TestRMEmbeddedElector.java  | 72 +++++++++++++-------
 1 file changed, 49 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d47a0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index 140483a..9d38149 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -22,18 +22,22 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -48,6 +52,8 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
 
   private Configuration conf;
   private AtomicBoolean callbackCalled;
+  private AtomicInteger transitionToActiveCounter;
+  private AtomicInteger transitionToStandbyCounter;
 
   private enum SyncTestType {
     ACTIVE,
@@ -75,6 +81,8 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
     conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
 
     callbackCalled = new AtomicBoolean(false);
+    transitionToActiveCounter = new AtomicInteger(0);
+    transitionToStandbyCounter = new AtomicInteger(0);
   }
 
   /**
@@ -103,7 +111,7 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
    */
   @Test
   public void testCallbackSynchronization()
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     testCallbackSynchronization(SyncTestType.ACTIVE);
     testCallbackSynchronization(SyncTestType.STANDBY);
     testCallbackSynchronization(SyncTestType.NEUTRAL);
@@ -117,9 +125,10 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
    * @param type the type of test to run
    * @throws IOException if there's an issue transitioning
    * @throws InterruptedException if interrupted
+   * @throws TimeoutException if waitFor timeout reached
    */
   private void testCallbackSynchronization(SyncTestType type)
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     AdminService as = mock(AdminService.class);
     RMContext rc = mock(RMContext.class);
     ResourceManager rm = mock(ResourceManager.class);
@@ -129,6 +138,17 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
     when(rm.getRMContext()).thenReturn(rc);
     when(rc.getRMAdminService()).thenReturn(as);
 
+    doAnswer(invocation -> {
+      transitionToActiveCounter.incrementAndGet();
+      return null;
+    }).when(as).transitionToActive(any());
+    transitionToActiveCounter.set(0);
+    doAnswer(invocation -> {
+      transitionToStandbyCounter.incrementAndGet();
+      return null;
+    }).when(as).transitionToStandby(any());
+    transitionToStandbyCounter.set(0);
+
     ActiveStandbyElectorBasedElectorService ees =
         new ActiveStandbyElectorBasedElectorService(rm);
     ees.init(myConf);
@@ -165,15 +185,16 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
    * @param ees the embedded elector service
    * @throws IOException if there's an issue transitioning
    * @throws InterruptedException if interrupted
+   * @throws TimeoutException if waitFor timeout reached
    */
   private void testCallbackSynchronizationActive(AdminService as,
       ActiveStandbyElectorBasedElectorService ees)
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     ees.becomeActive();
 
-    Thread.sleep(100);
-
-    verify(as).transitionToActive(any());
+    GenericTestUtils.waitFor(
+        () -> transitionToActiveCounter.get() >= 1, 500, 10 * 1000);
+    verify(as, times(1)).transitionToActive(any());
     verify(as, never()).transitionToStandby(any());
   }
 
@@ -185,16 +206,16 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
    * @param ees the embedded elector service
    * @throws IOException if there's an issue transitioning
    * @throws InterruptedException if interrupted
+   * @throws TimeoutException if waitFor timeout reached
    */
   private void testCallbackSynchronizationStandby(AdminService as,
       ActiveStandbyElectorBasedElectorService ees)
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     ees.becomeStandby();
 
-    Thread.sleep(100);
-
-    verify(as, atLeast(1)).transitionToStandby(any());
-    verify(as, atMost(1)).transitionToStandby(any());
+    GenericTestUtils.waitFor(
+        () -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
+    verify(as, times(1)).transitionToStandby(any());
   }
 
   /**
@@ -204,16 +225,16 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
    * @param ees the embedded elector service
    * @throws IOException if there's an issue transitioning
    * @throws InterruptedException if interrupted
+   * @throws TimeoutException if waitFor timeout reached
    */
   private void testCallbackSynchronizationNeutral(AdminService as,
       ActiveStandbyElectorBasedElectorService ees)
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     ees.enterNeutralMode();
 
-    Thread.sleep(100);
-
-    verify(as, atLeast(1)).transitionToStandby(any());
-    verify(as, atMost(1)).transitionToStandby(any());
+    GenericTestUtils.waitFor(
+        () -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
+    verify(as, times(1)).transitionToStandby(any());
   }
 
   /**
@@ -224,10 +245,11 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
    * @param ees the embedded elector service
    * @throws IOException if there's an issue transitioning
    * @throws InterruptedException if interrupted
+   * @throws TimeoutException if waitFor timeout reached
    */
   private void testCallbackSynchronizationTimingActive(AdminService as,
       ActiveStandbyElectorBasedElectorService ees)
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     synchronized (ees.zkDisconnectLock) {
       // Sleep while holding the lock so that the timer thread can't do
       // anything when it runs.  Sleep until we're pretty sure the timer thread
@@ -243,7 +265,9 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
     // going to do, hopefully nothing.
     Thread.sleep(50);
 
-    verify(as).transitionToActive(any());
+    GenericTestUtils.waitFor(
+        () -> transitionToActiveCounter.get() >= 1, 500, 10 * 1000);
+    verify(as, times(1)).transitionToActive(any());
     verify(as, never()).transitionToStandby(any());
   }
 
@@ -255,10 +279,11 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
    * @param ees the embedded elector service
    * @throws IOException if there's an issue transitioning
    * @throws InterruptedException if interrupted
+   * @throws TimeoutException if waitFor timeout reached
    */
   private void testCallbackSynchronizationTimingStandby(AdminService as,
       ActiveStandbyElectorBasedElectorService ees)
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     synchronized (ees.zkDisconnectLock) {
       // Sleep while holding the lock so that the timer thread can't do
       // anything when it runs.  Sleep until we're pretty sure the timer thread
@@ -274,8 +299,9 @@ public class TestRMEmbeddedElector extends 
ClientBaseWithFixes {
     // going to do, hopefully nothing.
     Thread.sleep(50);
 
-    verify(as, atLeast(1)).transitionToStandby(any());
-    verify(as, atMost(1)).transitionToStandby(any());
+    GenericTestUtils.waitFor(
+        () -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
+    verify(as, times(1)).transitionToStandby(any());
   }
 
   private class MockRMWithElector extends MockRM {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to