This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new b453cbd  HBASE-26020 Split TestWALEntryStream.testDifferentCounts out 
(#3409)
b453cbd is described below

commit b453cbd8197aaee7ad912e33850d0b395fcea7c4
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Wed Jun 23 22:46:07 2021 +0800

    HBASE-26020 Split TestWALEntryStream.testDifferentCounts out (#3409)
    
    Signed-off-by: Xiaolin Ha <haxiao...@apache.org>
---
 .../org/apache/hadoop/hbase/TableNameTestRule.java |   2 +-
 ...tryStream.java => TestBasicWALEntryStream.java} | 342 +++++----------------
 ...java => TestBasicWALEntryStreamAsyncFSWAL.java} |  27 +-
 ...eam.java => TestBasicWALEntryStreamFSHLog.java} |  21 +-
 .../TestWALEntryStreamDifferentCounts.java         |  89 ++++++
 ...stWALEntryStreamDifferentCountsAsyncFSWAL.java} |  28 +-
 ...> TestWALEntryStreamDifferentCountsFSHLog.java} |  25 +-
 .../regionserver/WALEntryStreamTestBase.java       | 182 +++++++++++
 8 files changed, 384 insertions(+), 332 deletions(-)

diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/TableNameTestRule.java 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/TableNameTestRule.java
index ca7d446..56473c9 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TableNameTestRule.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TableNameTestRule.java
@@ -39,7 +39,7 @@ public class TableNameTestRule extends TestWatcher {
    * This helper strips out the parameter suffixes.
    * @return current test method name with out parameterized suffixes.
    */
-  private static String cleanUpTestName(String methodName) {
+  public static String cleanUpTestName(String methodName) {
     int index = methodName.indexOf('[');
     if (index == -1) {
       return methodName;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
similarity index 67%
rename from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
rename to 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index 3442f98..ad77c9d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,186 +46,36 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
 import org.mockito.Mockito;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestWALEntryStream {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestWALEntryStream.class);
-
-  private static final long TEST_TIMEOUT_MS = 5000;
-  protected static HBaseTestingUtility TEST_UTIL;
-  protected static Configuration CONF;
-  protected static FileSystem fs;
-  protected static MiniDFSCluster cluster;
-  private static final TableName tableName = TableName.valueOf("tablename");
-  private static final byte[] family = Bytes.toBytes("column");
-  private static final byte[] qualifier = Bytes.toBytes("qualifier");
-  private static final RegionInfo info = 
RegionInfoBuilder.newBuilder(tableName)
-      
.setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
-  private static final NavigableMap<byte[], Integer> scopes = getScopes();
-  private final String fakeWalGroupId = "fake-wal-group-id";
-
-  /**
-   * Test helper that waits until a non-null entry is available in the stream 
next or times out.
-   * A {@link WALEntryStream} provides a streaming access to a queue of log 
files. Since the stream
-   * can be consumed as the file is being written, callers relying on {@link 
WALEntryStream#next()}
-   * may need to retry multiple times before an entry appended to the WAL is 
visible to the stream
-   * consumers. One such cause of delay is the close() of writer writing these 
log files. While the
-   * closure is in progress, the stream does not switch to the next log in the 
queue and next() may
-   * return null entries. This utility wraps these retries into a single next 
call and that makes
-   * the test code simpler.
-   */
-  private static class WALEntryStreamWithRetries extends WALEntryStream {
-    // Class member to be able to set a non-final from within a lambda.
-    private Entry result;
-
-    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, 
Configuration conf,
-         long startPosition, WALFileLengthProvider walFileLengthProvider, 
ServerName serverName,
-         MetricsSource metrics, String walGroupId) throws IOException {
-      super(logQueue, conf, startPosition, walFileLengthProvider, serverName, 
metrics, walGroupId);
-    }
-
-    @Override
-    public Entry next() {
-      Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> (
-          result = WALEntryStreamWithRetries.super.next()) != null);
-      return result;
-    }
-  }
-
-  private static NavigableMap<byte[], Integer> getScopes() {
-    NavigableMap<byte[], Integer> scopes = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
-    scopes.put(family, 1);
-    return scopes;
-  }
-
-  private WAL log;
-  ReplicationSourceLogQueue logQueue;
-  private PathWatcher pathWatcher;
-
-  @Rule
-  public TestName tn = new TestName();
-  private final MultiVersionConcurrencyControl mvcc = new 
MultiVersionConcurrencyControl();
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL = new HBaseTestingUtility();
-    CONF = TEST_UTIL.getConfiguration();
-    CONF.setLong("replication.source.sleepforretries", 10);
-    TEST_UTIL.startMiniDFSCluster(3);
-
-    cluster = TEST_UTIL.getDFSCluster();
-    fs = cluster.getFileSystem();
-  }
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
+public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
 
   @Before
   public void setUp() throws Exception {
-    ReplicationSource source = mock(ReplicationSource.class);
-    MetricsSource metricsSource = new MetricsSource("2");
-    // Source with the same id is shared and carries values from the last run
-    metricsSource.clear();
-    logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
-    pathWatcher = new PathWatcher();
-    final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
-    wals.getWALProvider().addWALActionsListener(pathWatcher);
-    log = wals.getWAL(info);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (log != null) {
-      log.close();
-    }
-  }
-
-  // Try out different combinations of row count and KeyValue count
-  @Test
-  public void testDifferentCounts() throws Exception {
-    int[] NB_ROWS = { 1500, 60000 };
-    int[] NB_KVS = { 1, 100 };
-    // whether compression is used
-    Boolean[] BOOL_VALS = { false, true };
-    // long lastPosition = 0;
-    for (int nbRows : NB_ROWS) {
-      for (int walEditKVs : NB_KVS) {
-        for (boolean isCompressionEnabled : BOOL_VALS) {
-          
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
-            isCompressionEnabled);
-          mvcc.advanceTo(1);
-
-          for (int i = 0; i < nbRows; i++) {
-            appendToLogAndSync(walEditKVs);
-          }
-
-          log.rollWriter();
-
-          try (WALEntryStream entryStream =
-              new WALEntryStream(logQueue, CONF, 0, log, null,
-                new MetricsSource("1"), fakeWalGroupId)) {
-            int i = 0;
-            while (entryStream.hasNext()) {
-              assertNotNull(entryStream.next());
-              i++;
-            }
-            assertEquals(nbRows, i);
-
-            // should've read all entries
-            assertFalse(entryStream.hasNext());
-          }
-          // reset everything for next loop
-          log.close();
-          setUp();
-        }
-      }
-    }
+    initWAL();
   }
 
   /**
@@ -235,7 +86,7 @@ public class TestWALEntryStream {
     appendToLogAndSync();
     long oldPos;
     try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, CONF, 0, log, null, new 
MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), 
fakeWalGroupId)) {
       // There's one edit in the log, read it. Reading past it needs to throw 
exception
       assertTrue(entryStream.hasNext());
       WAL.Entry entry = entryStream.peek();
@@ -249,8 +100,8 @@ public class TestWALEntryStream {
 
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, 
CONF, oldPos,
-        log, null, new MetricsSource("1"), fakeWalGroupId)) {
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, 
CONF, oldPos, log,
+      null, new MetricsSource("1"), fakeWalGroupId)) {
       // Read the newly added entry, make sure we made progress
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
@@ -263,8 +114,8 @@ public class TestWALEntryStream {
     log.rollWriter();
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, 
CONF, oldPos,
-        log, null, new MetricsSource("1"), fakeWalGroupId)) {
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, 
CONF, oldPos, log,
+      null, new MetricsSource("1"), fakeWalGroupId)) {
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
@@ -288,9 +139,8 @@ public class TestWALEntryStream {
   public void testLogrollWhileStreaming() throws Exception {
     appendToLog("1");
     appendToLog("2");// 2
-    try (WALEntryStream entryStream =
-        new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
-          new MetricsSource("1"), fakeWalGroupId)) {
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, 
CONF, 0, log, null,
+      new MetricsSource("1"), fakeWalGroupId)) {
       assertEquals("1", getRow(entryStream.next()));
 
       appendToLog("3"); // 3 - comes in after reader opened
@@ -299,7 +149,7 @@ public class TestWALEntryStream {
 
       assertEquals("2", getRow(entryStream.next()));
       assertEquals(2, getQueue().size()); // we should not have dequeued yet 
since there's still an
-                                        // entry in first log
+      // entry in first log
       assertEquals("3", getRow(entryStream.next())); // if implemented 
improperly, this would be 4
                                                      // and 3 would be skipped
       assertEquals("4", getRow(entryStream.next())); // 4
@@ -316,8 +166,7 @@ public class TestWALEntryStream {
   public void testNewEntriesWhileStreaming() throws Exception {
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, CONF, 0, log, null,
-          new MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), 
fakeWalGroupId)) {
       entryStream.next(); // we've hit the end of the stream at this point
 
       // some new entries come in while we're streaming
@@ -340,17 +189,15 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, CONF, 0, log, null,
-          new MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), 
fakeWalGroupId)) {
       entryStream.next(); // we've hit the end of the stream at this point
       appendToLog("2");
       appendToLog("3");
       lastPosition = entryStream.getPosition();
     }
     // next stream should picks up where we left off
-    try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, CONF, lastPosition, log, null,
-          new MetricsSource("1"), fakeWalGroupId)) {
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 
lastPosition, log, null,
+      new MetricsSource("1"), fakeWalGroupId)) {
       assertEquals("2", getRow(entryStream.next()));
       assertEquals("3", getRow(entryStream.next()));
       assertFalse(entryStream.hasNext()); // done
@@ -368,27 +215,24 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendEntriesToLogAndSync(3);
     // read only one element
-    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 
lastPosition,
-        log, null, new MetricsSource("1"), fakeWalGroupId)) {
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 
lastPosition, log, null,
+      new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next();
       lastPosition = entryStream.getPosition();
     }
     // there should still be two more entries from where we left off
-    try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, CONF, lastPosition, log, null,
-          new MetricsSource("1"), fakeWalGroupId)) {
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 
lastPosition, log, null,
+      new MetricsSource("1"), fakeWalGroupId)) {
       assertNotNull(entryStream.next());
       assertNotNull(entryStream.next());
       assertFalse(entryStream.hasNext());
     }
   }
 
-
   @Test
   public void testEmptyStream() throws Exception {
     try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, CONF, 0, log, null,
-          new MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), 
fakeWalGroupId)) {
       assertFalse(entryStream.hasNext());
     }
   }
@@ -398,9 +242,9 @@ public class TestWALEntryStream {
     Map<String, byte[]> attributes = new HashMap<String, byte[]>();
     attributes.put("foo", Bytes.toBytes("foo-value"));
     attributes.put("bar", Bytes.toBytes("bar-value"));
-    WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
-        System.currentTimeMillis(), new ArrayList<UUID>(), 0L, 0L,
-        mvcc, scopes, attributes);
+    WALKeyImpl key =
+      new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 
EnvironmentEdgeManager.currentTime(),
+        new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes);
     Assert.assertEquals(attributes, key.getExtendedAttributes());
 
     WALProtos.WALKey.Builder builder = 
key.getBuilder(WALCellCodec.getNoneCompressor());
@@ -409,12 +253,12 @@ public class TestWALEntryStream {
     WALKeyImpl deserializedKey = new WALKeyImpl();
     deserializedKey.readFieldsFromPb(serializedKey, 
WALCellCodec.getNoneUncompressor());
 
-    //equals() only checks region name, sequence id and write time
+    // equals() only checks region name, sequence id and write time
     Assert.assertEquals(key, deserializedKey);
-    //can't use Map.equals() because byte arrays use reference equality
+    // can't use Map.equals() because byte arrays use reference equality
     Assert.assertEquals(key.getExtendedAttributes().keySet(),
-        deserializedKey.getExtendedAttributes().keySet());
-    for (Map.Entry<String, byte[]> entry : 
deserializedKey.getExtendedAttributes().entrySet()){
+      deserializedKey.getExtendedAttributes().keySet());
+    for (Map.Entry<String, byte[]> entry : 
deserializedKey.getExtendedAttributes().entrySet()) {
       Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), 
entry.getValue());
     }
     Assert.assertEquals(key.getReplicationScopes(), 
deserializedKey.getReplicationScopes());
@@ -423,8 +267,8 @@ public class TestWALEntryStream {
   private ReplicationSource mockReplicationSource(boolean recovered, 
Configuration conf) {
     ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
-        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    when(mockSourceManager.getTotalBufferLimit())
+      .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     Server mockServer = Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceManager()).thenReturn(mockSourceManager);
@@ -432,8 +276,8 @@ public class TestWALEntryStream {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
-    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
-        MetricsReplicationGlobalSourceSource.class);
+    MetricsReplicationGlobalSourceSource globalMetrics =
+      Mockito.mock(MetricsReplicationGlobalSourceSource.class);
     when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
     return source;
   }
@@ -441,20 +285,18 @@ public class TestWALEntryStream {
   private ReplicationSourceWALReader createReader(boolean recovered, 
Configuration conf) {
     ReplicationSource source = mockReplicationSource(recovered, conf);
     when(source.isPeerEnabled()).thenReturn(true);
-    ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, conf, logQueue, 0, getDummyFilter(), 
source,
-        fakeWalGroupId);
+    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, 
conf, logQueue, 0,
+      getDummyFilter(), source, fakeWalGroupId);
     reader.start();
     return reader;
   }
 
   private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int 
numFailures,
-      Configuration conf) {
+    Configuration conf) {
     ReplicationSource source = mockReplicationSource(false, conf);
     when(source.isPeerEnabled()).thenReturn(true);
-    ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, conf, logQueue, 0,
-        getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
+    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, 
conf, logQueue, 0,
+      getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
     reader.start();
     return reader;
   }
@@ -465,8 +307,7 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, CONF, 0, log, null,
-          new MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), 
fakeWalGroupId)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -497,8 +338,7 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null,
-        new MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), 
fakeWalGroupId)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -508,8 +348,8 @@ public class TestWALEntryStream {
     // start up a reader
     Path walPath = getQueue().peek();
     int numFailuresInFilter = 5;
-    ReplicationSourceWALReader reader = createReaderWithBadReplicationFilter(
-      numFailuresInFilter, CONF);
+    ReplicationSourceWALReader reader =
+      createReaderWithBadReplicationFilter(numFailuresInFilter, CONF);
     WALEntryBatch entryBatch = reader.take();
     assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
 
@@ -565,7 +405,7 @@ public class TestWALEntryStream {
       @Override
       public boolean evaluate() throws Exception {
         return fs.getFileStatus(walPath).getLen() > 0 &&
-            ((AbstractFSWAL) log).getInflightWALCloseCount() == 0;
+          ((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0;
       }
 
       @Override
@@ -608,13 +448,12 @@ public class TestWALEntryStream {
 
   @Test
   public void testReplicationSourceWALReaderDisabled()
-      throws IOException, InterruptedException, ExecutionException {
+    throws IOException, InterruptedException, ExecutionException {
     appendEntriesToLogAndSync(3);
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null,
-        new MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), 
fakeWalGroupId)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -631,9 +470,8 @@ public class TestWALEntryStream {
       return enabled.get();
     });
 
-    ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, CONF, logQueue, 0, getDummyFilter(),
-        source, fakeWalGroupId);
+    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, 
CONF, logQueue, 0,
+      getDummyFilter(), source, fakeWalGroupId);
     reader.start();
     Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
       return reader.take();
@@ -660,9 +498,8 @@ public class TestWALEntryStream {
   }
 
   private void appendToLog(String key) throws IOException {
-    final long txid = log.appendData(info,
-      new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 
System.currentTimeMillis(),
-          mvcc, scopes), getWALEdit(key));
+    final long txid = log.appendData(info, new 
WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key));
     log.sync(txid);
   }
 
@@ -674,33 +511,10 @@ public class TestWALEntryStream {
     log.sync(txid);
   }
 
-  private void appendToLogAndSync() throws IOException {
-    appendToLogAndSync(1);
-  }
-
-  private void appendToLogAndSync(int count) throws IOException {
-    long txid = appendToLog(count);
-    log.sync(txid);
-  }
-
-  private long appendToLog(int count) throws IOException {
-    return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), 
tableName,
-      System.currentTimeMillis(), mvcc, scopes), getWALEdits(count));
-  }
-
-  private WALEdit getWALEdits(int count) {
-    WALEdit edit = new WALEdit();
-    for (int i = 0; i < count; i++) {
-      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, 
qualifier,
-          System.currentTimeMillis(), qualifier));
-    }
-    return edit;
-  }
-
   private WALEdit getWALEdit(String row) {
     WALEdit edit = new WALEdit();
-    edit.add(
-      new KeyValue(Bytes.toBytes(row), family, qualifier, 
System.currentTimeMillis(), qualifier));
+    edit.add(new KeyValue(Bytes.toBytes(row), family, qualifier,
+      EnvironmentEdgeManager.currentTime(), qualifier));
     return edit;
   }
 
@@ -735,29 +549,18 @@ public class TestWALEntryStream {
       throw new WALEntryFilterRetryableException("failing filter");
     }
 
-    public static int numFailures(){
+    public static int numFailures() {
       return countFailures;
     }
   }
 
-  class PathWatcher implements WALActionsListener {
-
-    Path currentPath;
-
-    @Override
-    public void preLogRoll(Path oldPath, Path newPath) {
-      logQueue.enqueueLog(newPath, fakeWalGroupId);
-      currentPath = newPath;
-    }
-  }
-
   @Test
   public void testReadBeyondCommittedLength() throws IOException, 
InterruptedException {
     appendToLog("1");
     appendToLog("2");
     long size = 
log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
     AtomicLong fileLength = new AtomicLong(size - 1);
-    try (WALEntryStream entryStream = new WALEntryStream(logQueue,  CONF, 0,
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0,
       p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), 
fakeWalGroupId)) {
       assertTrue(entryStream.hasNext());
       assertNotNull(entryStream.next());
@@ -779,8 +582,8 @@ public class TestWALEntryStream {
   }
 
   /*
-    Test removal of 0 length log from logQueue if the source is a recovered 
source and
-    size of logQueue is only 1.
+   * Test removal of 0 length log from logQueue if the source is a recovered 
source and size of
+   * logQueue is only 1.
    */
   @Test
   public void testEOFExceptionForRecoveredQueue() throws Exception {
@@ -804,9 +607,8 @@ public class TestWALEntryStream {
     doNothing().when(metrics).decrSizeOfLogQueue();
     ReplicationSourceLogQueue localLogQueue = new 
ReplicationSourceLogQueue(conf, metrics, source);
     localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
-    ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
-        getDummyFilter(), source, fakeWalGroupId);
+    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, 
conf, localLogQueue, 0,
+      getDummyFilter(), source, fakeWalGroupId);
     reader.run();
     // ReplicationSourceWALReaderThread#handleEofException method will
     // remove empty log from logQueue.
@@ -820,7 +622,7 @@ public class TestWALEntryStream {
     ReplicationSource source = mockReplicationSource(true, conf);
     ReplicationSourceLogQueue localLogQueue = new 
ReplicationSourceLogQueue(conf, metrics, source);
     // Create a 0 length log.
-    Path emptyLog = new Path(fs.getHomeDirectory(),"log.2");
+    Path emptyLog = new Path(fs.getHomeDirectory(), "log.2");
     FSDataOutputStream fsdos = fs.create(emptyLog);
     fsdos.close();
     assertEquals(0, fs.getFileStatus(emptyLog).getLen());
@@ -834,7 +636,7 @@ public class TestWALEntryStream {
     ReplicationSourceManager mockSourceManager = 
mock(ReplicationSourceManager.class);
     // Make it look like the source is from recovered source.
     when(mockSourceManager.getOldSources())
-      .thenReturn(new 
ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
+      .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) 
source)));
     when(source.isPeerEnabled()).thenReturn(true);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     // Override the max retries multiplier to fail fast.
@@ -842,11 +644,10 @@ public class TestWALEntryStream {
     conf.setBoolean("replication.source.eof.autorecovery", true);
     conf.setInt("replication.source.nb.batches", 10);
     // Create a reader thread.
-    ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
-        getDummyFilter(), source, fakeWalGroupId);
-    assertEquals("Initial log queue size is not correct",
-      2, localLogQueue.getQueueSize(fakeWalGroupId));
+    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, 
conf, localLogQueue, 0,
+      getDummyFilter(), source, fakeWalGroupId);
+    assertEquals("Initial log queue size is not correct", 2,
+      localLogQueue.getQueueSize(fakeWalGroupId));
     reader.run();
 
     // remove empty log from logQueue.
@@ -861,11 +662,10 @@ public class TestWALEntryStream {
   private void appendEntries(WALProvider.Writer writer, int numEntries) throws 
IOException {
     for (int i = 0; i < numEntries; i++) {
       byte[] b = Bytes.toBytes(Integer.toString(i));
-      KeyValue kv = new KeyValue(b,b,b);
+      KeyValue kv = new KeyValue(b, b, b);
       WALEdit edit = new WALEdit();
       edit.add(kv);
-      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
-        HConstants.DEFAULT_CLUSTER_ID);
+      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, 
HConstants.DEFAULT_CLUSTER_ID);
       NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
       scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
       writer.append(new WAL.Entry(key, edit));
@@ -887,8 +687,8 @@ public class TestWALEntryStream {
     // After rolling there will be 2 wals in the queue
     assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
 
-    try (WALEntryStream entryStream = new WALEntryStream(
-      logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
+    try (WALEntryStream entryStream =
+      new WALEntryStream(logQueue, CONF, 0, log, null, logQueue.getMetrics(), 
fakeWalGroupId)) {
       // There's one edit in the log, read it.
       assertTrue(entryStream.hasNext());
       WAL.Entry entry = entryStream.next();
@@ -900,13 +700,13 @@ public class TestWALEntryStream {
   }
 
   /**
-   * Tests that wals are closed cleanly and we read the trailer when we remove 
wal
-   * from WALEntryStream.
+   * Tests that wals are closed cleanly and we read the trailer when we remove 
wal from
+   * WALEntryStream.
    */
   @Test
   public void testCleanClosedWALs() throws Exception {
-    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(
-      logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, 
CONF, 0, log, null,
+      logQueue.getMetrics(), fakeWalGroupId)) {
       assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
       appendToLogAndSync();
       assertNotNull(entryStream.next());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
similarity index 60%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
index 32d6ec4..6ad0d15 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,34 +18,29 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
 
 /**
- * TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} 
as the WAL provider.
+ * TestBasicWALEntryStream with {@link AsyncFSWALProvider} as the WAL provider.
  */
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestFSHLogWALEntryStream extends TestWALEntryStream {
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestBasicWALEntryStreamAsyncFSWAL extends TestBasicWALEntryStream 
{
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);
+    HBaseClassTestRule.forClass(TestBasicWALEntryStreamAsyncFSWAL.class);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL = new HBaseTestingUtility();
-    CONF = TEST_UTIL.getConfiguration();
-    CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, 
AbstractFSWALProvider.class);
-    CONF.setLong("replication.source.sleepforretries", 10);
-    TEST_UTIL.startMiniDFSCluster(3);
-    cluster = TEST_UTIL.getDFSCluster();
-    fs = cluster.getFileSystem();
+    TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, 
AsyncFSWALProvider.class,
+      AbstractFSWALProvider.class);
+    startCluster();
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
similarity index 65%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
index 32d6ec4..75e85b5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
@@ -29,23 +28,19 @@ import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
 
 /**
- * TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} 
as the WAL provider.
+ * TestBasicWALEntryStream with {@link FSHLogProvider} as the WAL provider.
  */
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestFSHLogWALEntryStream extends TestWALEntryStream {
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestBasicWALEntryStreamFSHLog extends TestBasicWALEntryStream {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);
+    HBaseClassTestRule.forClass(TestBasicWALEntryStreamFSHLog.class);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL = new HBaseTestingUtility();
-    CONF = TEST_UTIL.getConfiguration();
-    CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, 
AbstractFSWALProvider.class);
-    CONF.setLong("replication.source.sleepforretries", 10);
-    TEST_UTIL.startMiniDFSCluster(3);
-    cluster = TEST_UTIL.getDFSCluster();
-    fs = cluster.getFileSystem();
+    TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, 
FSHLogProvider.class,
+      AbstractFSWALProvider.class);
+    startCluster();
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
new file mode 100644
index 0000000..bf45620
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.HConstants;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Try out different combinations of row count and KeyValue count
+ */
+public abstract class TestWALEntryStreamDifferentCounts extends 
WALEntryStreamTestBase {
+
+  @Parameter(0)
+  public int nbRows;
+
+  @Parameter(1)
+  public int walEditKVs;
+
+  @Parameter(2)
+  public boolean isCompressionEnabled;
+
+  @Parameters(name = "{index}: nbRows={0}, walEditKVs={1}, 
isCompressionEnabled={2}")
+  public static Iterable<Object[]> data() {
+    List<Object[]> params = new ArrayList<>();
+    for (int nbRows : new int[] { 1500, 60000 }) {
+      for (int walEditKVs : new int[] { 1, 100 }) {
+        for (boolean isCompressionEnabled : new boolean[] { false, true }) {
+          params.add(new Object[] { nbRows, walEditKVs, isCompressionEnabled 
});
+        }
+      }
+    }
+    return params;
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
+    initWAL();
+  }
+
+  @Test
+  public void testDifferentCounts() throws Exception {
+    mvcc.advanceTo(1);
+
+    for (int i = 0; i < nbRows; i++) {
+      appendToLogAndSync(walEditKVs);
+    }
+
+    log.rollWriter();
+
+    try (WALEntryStream entryStream =
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), 
fakeWalGroupId)) {
+      int i = 0;
+      while (entryStream.hasNext()) {
+        assertNotNull(entryStream.next());
+        i++;
+      }
+      assertEquals(nbRows, i);
+
+      // should've read all entries
+      assertFalse(entryStream.hasNext());
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsAsyncFSWAL.java
similarity index 65%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsAsyncFSWAL.java
index 32d6ec4..c734f79 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsAsyncFSWAL.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,34 +18,30 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-/**
- * TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} 
as the WAL provider.
- */
+@RunWith(Parameterized.class)
 @Category({ ReplicationTests.class, LargeTests.class })
-public class TestFSHLogWALEntryStream extends TestWALEntryStream {
+public class TestWALEntryStreamDifferentCountsAsyncFSWAL
+  extends TestWALEntryStreamDifferentCounts {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);
+    
HBaseClassTestRule.forClass(TestWALEntryStreamDifferentCountsAsyncFSWAL.class);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL = new HBaseTestingUtility();
-    CONF = TEST_UTIL.getConfiguration();
-    CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, 
AbstractFSWALProvider.class);
-    CONF.setLong("replication.source.sleepforretries", 10);
-    TEST_UTIL.startMiniDFSCluster(3);
-    cluster = TEST_UTIL.getDFSCluster();
-    fs = cluster.getFileSystem();
+    TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, 
AsyncFSWALProvider.class,
+      AbstractFSWALProvider.class);
+    startCluster();
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsFSHLog.java
similarity index 68%
rename from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
rename to 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsFSHLog.java
index 32d6ec4..66dc00e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsFSHLog.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -27,25 +26,21 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-/**
- * TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} 
as the WAL provider.
- */
+@RunWith(Parameterized.class)
 @Category({ ReplicationTests.class, LargeTests.class })
-public class TestFSHLogWALEntryStream extends TestWALEntryStream {
+public class TestWALEntryStreamDifferentCountsFSHLog extends 
TestWALEntryStreamDifferentCounts {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);
+    HBaseClassTestRule.forClass(TestWALEntryStreamDifferentCountsFSHLog.class);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL = new HBaseTestingUtility();
-    CONF = TEST_UTIL.getConfiguration();
-    CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, 
AbstractFSWALProvider.class);
-    CONF.setLong("replication.source.sleepforretries", 10);
-    TEST_UTIL.startMiniDFSCluster(3);
-    cluster = TEST_UTIL.getDFSCluster();
-    fs = cluster.getFileSystem();
+    TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, 
FSHLogProvider.class,
+      AbstractFSWALProvider.class);
+    startCluster();
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
new file mode 100644
index 0000000..d232a6b
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+/**
+ * Base class for WALEntryStream tests.
+ */
+public abstract class WALEntryStreamTestBase {
+
+  protected static final long TEST_TIMEOUT_MS = 5000;
+  protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();;
+  protected static Configuration CONF;
+  protected static FileSystem fs;
+  protected static MiniDFSCluster cluster;
+  protected static final TableName tableName = TableName.valueOf("tablename");
+  protected static final byte[] family = Bytes.toBytes("column");
+  protected static final byte[] qualifier = Bytes.toBytes("qualifier");
+  protected static final RegionInfo info = 
RegionInfoBuilder.newBuilder(tableName)
+    
.setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
+  protected static final NavigableMap<byte[], Integer> scopes = getScopes();
+  protected final String fakeWalGroupId = "fake-wal-group-id";
+
+  /**
+   * Test helper that waits until a non-null entry is available in the stream 
next or times out. A
+   * {@link WALEntryStream} provides a streaming access to a queue of log 
files. Since the stream
+   * can be consumed as the file is being written, callers relying on {@link 
WALEntryStream#next()}
+   * may need to retry multiple times before an entry appended to the WAL is 
visible to the stream
+   * consumers. One such cause of delay is the close() of writer writing these 
log files. While the
+   * closure is in progress, the stream does not switch to the next log in the 
queue and next() may
+   * return null entries. This utility wraps these retries into a single next 
call and that makes
+   * the test code simpler.
+   */
+  protected static class WALEntryStreamWithRetries extends WALEntryStream {
+    // Class member to be able to set a non-final from within a lambda.
+    private Entry result;
+
+    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, 
Configuration conf,
+      long startPosition, WALFileLengthProvider walFileLengthProvider, 
ServerName serverName,
+      MetricsSource metrics, String walGroupId) throws IOException {
+      super(logQueue, conf, startPosition, walFileLengthProvider, serverName, 
metrics, walGroupId);
+    }
+
+    @Override
+    public Entry next() {
+      Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> {
+        result = WALEntryStreamWithRetries.super.next();
+        return result != null;
+      });
+      return result;
+    }
+  }
+
+  private static NavigableMap<byte[], Integer> getScopes() {
+    NavigableMap<byte[], Integer> scopes = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
+    scopes.put(family, 1);
+    return scopes;
+  }
+
+  class PathWatcher implements WALActionsListener {
+
+    Path currentPath;
+
+    @Override
+    public void preLogRoll(Path oldPath, Path newPath) {
+      logQueue.enqueueLog(newPath, fakeWalGroupId);
+      currentPath = newPath;
+    }
+  }
+
+  protected WAL log;
+  protected ReplicationSourceLogQueue logQueue;
+  protected PathWatcher pathWatcher;
+
+  @Rule
+  public TestName tn = new TestName();
+  protected final MultiVersionConcurrencyControl mvcc = new 
MultiVersionConcurrencyControl();
+
+  protected static void startCluster() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    CONF = TEST_UTIL.getConfiguration();
+    CONF.setLong("replication.source.sleepforretries", 10);
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    cluster = TEST_UTIL.getDFSCluster();
+    fs = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  protected void initWAL() throws IOException {
+    ReplicationSource source = mock(ReplicationSource.class);
+    MetricsSource metricsSource = new MetricsSource("2");
+    // Source with the same id is shared and carries values from the last run
+    metricsSource.clear();
+    logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
+    pathWatcher = new PathWatcher();
+    final WALFactory wals =
+      new WALFactory(CONF, 
TableNameTestRule.cleanUpTestName(tn.getMethodName()));
+    wals.getWALProvider().addWALActionsListener(pathWatcher);
+    log = wals.getWAL(info);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Closeables.close(log, true);
+  }
+
+  protected void appendToLogAndSync() throws IOException {
+    appendToLogAndSync(1);
+  }
+
+  protected void appendToLogAndSync(int count) throws IOException {
+    long txid = appendToLog(count);
+    log.sync(txid);
+  }
+
+  protected long appendToLog(int count) throws IOException {
+    return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), 
tableName,
+      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count));
+  }
+
+  protected WALEdit getWALEdits(int count) {
+    WALEdit edit = new WALEdit();
+    for (int i = 0; i < count; i++) {
+      edit.add(new 
KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier,
+        EnvironmentEdgeManager.currentTime(), qualifier));
+    }
+    return edit;
+  }
+}

Reply via email to