Repository: hbase
Updated Branches:
  refs/heads/branch-1.4 c51c78469 -> cf5e170bc


HBASE-16615 Fix flaky TestScannerHeartbeatMessages (Duo Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cf5e170b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cf5e170b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cf5e170b

Branch: refs/heads/branch-1.4
Commit: cf5e170bc7f099b95636e8656e1a5d1fd5d2019d
Parents: c51c784
Author: Michael Stack <st...@apache.org>
Authored: Mon Aug 21 16:01:26 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Mon Aug 21 16:02:57 2017 -0700

----------------------------------------------------------------------
 .../TestScannerHeartbeatMessages.java           | 105 ++++++++-----------
 1 file changed, 41 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cf5e170b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index c85e41c..fe938b3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -22,6 +22,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -58,6 +61,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -67,9 +71,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 /**
  * Here we test to make sure that scans return the expected Results when the 
server is sending the
  * Client heartbeat messages. Heartbeat messages are essentially keep-alive 
messages (they prevent
@@ -117,12 +118,10 @@ public class TestScannerHeartbeatMessages {
 
   // In this test, we sleep after reading each row. So we should make sure 
after we get some number
   // of rows and sleep same times we must reach time limit, and do not timeout 
after next sleeping.
-  // So set this to 200, we will get 3 rows and reach time limit at the start 
of 4th row, then sleep
-  // for the 4th time. Total time is 800 ms so we will not timeout.
-  private static int DEFAULT_ROW_SLEEP_TIME = 200;
+  private static int DEFAULT_ROW_SLEEP_TIME = 300;
 
   // Similar with row sleep time.
-  private static int DEFAULT_CF_SLEEP_TIME = 200;
+  private static int DEFAULT_CF_SLEEP_TIME = 300;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -182,7 +181,6 @@ public class TestScannerHeartbeatMessages {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.deleteTable(TABLE_NAME);
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -197,26 +195,13 @@ public class TestScannerHeartbeatMessages {
   }
 
   /**
-   * Test a variety of scan configurations to ensure that they return the 
expected Results when
-   * heartbeat messages are necessary. These tests are accumulated under one 
test case to ensure
-   * that they don't run in parallel. If the tests ran in parallel, they may 
conflict with each
-   * other due to changing static variables
-   */
-  @Test
-  public void testScannerHeartbeatMessages() throws Exception {
-    testImportanceOfHeartbeats(testHeartbeatBetweenRows());
-    testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies());
-    testImportanceOfHeartbeats(testHeartbeatWithSparseFilter());
-  }
-
-  /**
    * Run the test callable when heartbeats are enabled/disabled. We expect all 
tests to only pass
    * when heartbeat messages are enabled (otherwise the test is pointless). 
When heartbeats are
    * disabled, the test should throw an exception.
    * @param testCallable
    * @throws InterruptedException
    */
-  public void testImportanceOfHeartbeats(Callable<Void> testCallable) throws 
InterruptedException {
+  private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws 
InterruptedException {
     HeartbeatRPCServices.heartbeatsEnabled = true;
 
     try {
@@ -243,8 +228,9 @@ public class TestScannerHeartbeatMessages {
    * fetched.
    * @throws Exception
    */
-  public Callable<Void> testHeartbeatBetweenRows() throws Exception {
-    return new Callable<Void>() {
+  @Test
+  public void testHeartbeatBetweenRows() throws Exception {
+    testImportanceOfHeartbeats(new Callable<Void>() {
 
       @Override
       public Void call() throws Exception {
@@ -257,15 +243,16 @@ public class TestScannerHeartbeatMessages {
         testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, 
false);
         return null;
       }
-    };
+    });
   }
 
   /**
    * Test the case that the time limit for scans is reached in between column 
families
    * @throws Exception
    */
-  public Callable<Void> testHeartbeatBetweenColumnFamilies() throws Exception {
-    return new Callable<Void>() {
+  @Test
+  public void testHeartbeatBetweenColumnFamilies() throws Exception {
+    testImportanceOfHeartbeats(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
         // Configure the scan so that it can read the entire table in a single 
RPC. We want to test
@@ -282,24 +269,23 @@ public class TestScannerHeartbeatMessages {
         testEquivalenceOfScanWithHeartbeats(scanCopy, -1, 
DEFAULT_CF_SLEEP_TIME, true);
         return null;
       }
-    };
+    });
   }
 
-  public static class SparseFilter extends FilterBase{
+  public static class SparseFilter extends FilterBase {
 
     @Override
     public ReturnCode filterKeyValue(Cell v) throws IOException {
       try {
-        Thread.sleep(CLIENT_TIMEOUT/2 + 10);
+        Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ?
-          ReturnCode.INCLUDE :
-          ReturnCode.SKIP;
+      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? 
ReturnCode.INCLUDE
+          : ReturnCode.SKIP;
     }
 
-    public static Filter parseFrom(final byte [] pbBytes){
+    public static Filter parseFrom(final byte[] pbBytes) {
       return new SparseFilter();
     }
   }
@@ -308,8 +294,9 @@ public class TestScannerHeartbeatMessages {
    * Test the case that there is a filter which filters most of cells
    * @throws Exception
    */
-  public Callable<Void> testHeartbeatWithSparseFilter() throws Exception {
-    return new Callable<Void>() {
+  @Test
+  public void testHeartbeatWithSparseFilter() throws Exception {
+    testImportanceOfHeartbeats(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
         Scan scan = new Scan();
@@ -339,7 +326,7 @@ public class TestScannerHeartbeatMessages {
 
         return null;
       }
-    };
+    });
   }
 
   /**
@@ -352,7 +339,7 @@ public class TestScannerHeartbeatMessages {
    *          that column family are fetched
    * @throws Exception
    */
-  public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int 
rowSleepTime,
+  private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int 
rowSleepTime,
       int cfSleepTime, boolean sleepBeforeCf) throws Exception {
     disableSleeping();
     final ResultScanner scanner = TABLE.getScanner(scan);
@@ -427,14 +414,14 @@ public class TestScannerHeartbeatMessages {
    * Custom RSRpcServices instance that allows heartbeat support to be toggled
    */
   private static class HeartbeatRPCServices extends RSRpcServices {
-    private static boolean heartbeatsEnabled = true;
+    private static volatile boolean heartbeatsEnabled = true;
 
     public HeartbeatRPCServices(HRegionServer rs) throws IOException {
       super(rs);
     }
 
     @Override
-    public ScanResponse scan(RpcController controller, ScanRequest request) 
+    public ScanResponse scan(RpcController controller, ScanRequest request)
         throws ServiceException {
       ScanRequest.Builder builder = ScanRequest.newBuilder(request);
       builder.setClientHandlesHeartbeats(heartbeatsEnabled);
@@ -449,17 +436,17 @@ public class TestScannerHeartbeatMessages {
    */
   private static class HeartbeatHRegion extends HRegion {
     // Row sleeps occur AFTER each row worth of cells is retrieved.
-    private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
-    private static boolean sleepBetweenRows = false;
+    private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
+    private static volatile boolean sleepBetweenRows = false;
 
     // The sleep for column families can be initiated before or after we fetch 
the cells for the
     // column family. If the sleep occurs BEFORE then the time limits will be 
reached inside
     // StoreScanner while we are fetching individual cells. If the sleep 
occurs AFTER then the time
     // limit will be reached inside RegionScanner after all the cells for a 
column family have been
     // retrieved.
-    private static boolean sleepBeforeColumnFamily = false;
-    private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
-    private static boolean sleepBetweenColumnFamilies = false;
+    private static volatile boolean sleepBeforeColumnFamily = false;
+    private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
+    private static volatile boolean sleepBetweenColumnFamilies = false;
 
     public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, 
Configuration confParam,
         HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices 
rsServices) {
@@ -472,20 +459,14 @@ public class TestScannerHeartbeatMessages {
     }
 
     private static void columnFamilySleep() {
-      if (HeartbeatHRegion.sleepBetweenColumnFamilies) {
-        try {
-          Thread.sleep(HeartbeatHRegion.columnFamilySleepTime);
-        } catch (InterruptedException e) {
-        }
+      if (sleepBetweenColumnFamilies) {
+        Threads.sleepWithoutInterrupt(columnFamilySleepTime);
       }
     }
 
     private static void rowSleep() {
-      try {
-        if (HeartbeatHRegion.sleepBetweenRows) {
-          Thread.sleep(HeartbeatHRegion.rowSleepTime);
-        }
-      } catch (InterruptedException e) {
+      if (sleepBetweenRows) {
+        Threads.sleepWithoutInterrupt(rowSleepTime);
       }
     }
 
@@ -514,8 +495,7 @@ public class TestScannerHeartbeatMessages {
     }
 
     @Override
-    public boolean nextRaw(List<Cell> outResults, ScannerContext context)
-        throws IOException {
+    public boolean nextRaw(List<Cell> outResults, ScannerContext context) 
throws IOException {
       boolean moreRows = super.nextRaw(outResults, context);
       HeartbeatHRegion.rowSleep();
       return moreRows;
@@ -542,8 +522,7 @@ public class TestScannerHeartbeatMessages {
     }
 
     @Override
-    public boolean nextRaw(List<Cell> outResults, ScannerContext context)
-        throws IOException {
+    public boolean nextRaw(List<Cell> outResults, ScannerContext context) 
throws IOException {
       boolean moreRows = super.nextRaw(outResults, context);
       HeartbeatHRegion.rowSleep();
       return moreRows;
@@ -575,8 +554,7 @@ public class TestScannerHeartbeatMessages {
     }
 
     @Override
-    public boolean next(List<Cell> result, ScannerContext context)
-        throws IOException {
+    public boolean next(List<Cell> result, ScannerContext context) throws 
IOException {
       if (HeartbeatHRegion.sleepBeforeColumnFamily) 
HeartbeatHRegion.columnFamilySleep();
       boolean moreRows = super.next(result, context);
       if (!HeartbeatHRegion.sleepBeforeColumnFamily) 
HeartbeatHRegion.columnFamilySleep();
@@ -589,14 +567,13 @@ public class TestScannerHeartbeatMessages {
    * cells.
    */
   private static final class HeartbeatReversedKVHeap extends 
ReversedKeyValueHeap {
-    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners, 
+    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
         KVComparator comparator) throws IOException {
       super(scanners, comparator);
     }
 
     @Override
-    public boolean next(List<Cell> result, ScannerContext context)
-        throws IOException {
+    public boolean next(List<Cell> result, ScannerContext context) throws 
IOException {
       if (HeartbeatHRegion.sleepBeforeColumnFamily) 
HeartbeatHRegion.columnFamilySleep();
       boolean moreRows = super.next(result, context);
       if (!HeartbeatHRegion.sleepBeforeColumnFamily) 
HeartbeatHRegion.columnFamilySleep();

Reply via email to