Repository: hbase
Updated Branches:
  refs/heads/master 856ee283f -> 2e813f106


HBASE-19493 Make TestWALMonotonicallyIncreasingSeqId also work with AsyncFSWAL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2e813f10
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2e813f10
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2e813f10

Branch: refs/heads/master
Commit: 2e813f106f2647f8704378efbf3531051c5aea78
Parents: 856ee28
Author: zhangduo <zhang...@apache.org>
Authored: Tue Dec 12 11:33:02 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Dec 12 20:56:58 2017 +0800

----------------------------------------------------------------------
 .../TestWALMonotonicallyIncreasingSeqId.java    | 181 +++++++++++--------
 1 file changed, 104 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2e813f10/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
index fabd075..b537890 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
@@ -18,93 +18,116 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.junit.Assert;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
- * Test for HBASE-17471
- * MVCCPreAssign is added by HBASE-16698, but pre-assign mvcc is only used in 
put/delete
- * path. Other write paths like increment/append still assign mvcc in 
ringbuffer's consumer
- * thread. If put and increment are used parallel. Then seqid in WAL may not 
increase monotonically
- * Disorder in wals will lead to data loss.
- * This case use two thread to put and increment at the same time in a single 
region.
- * Then check the seqid in WAL. If seqid is wal is not monotonically 
increasing, this case will fail
- *
+ * Test for HBASE-17471.
+ * <p>
+ * MVCCPreAssign is added by HBASE-16698, but pre-assign mvcc is only used in 
put/delete path. Other
+ * write paths like increment/append still assign mvcc in ringbuffer's 
consumer thread. If put and
+ * increment are used parallel. Then seqid in WAL may not increase 
monotonically Disorder in wals
+ * will lead to data loss.
+ * <p>
+ * This case use two thread to put and increment at the same time in a single 
region. Then check the
+ * seqid in WAL. If seqid is wal is not monotonically increasing, this case 
will fail
  */
-@Category({RegionServerTests.class, SmallTests.class})
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, SmallTests.class })
 public class TestWALMonotonicallyIncreasingSeqId {
-  final Log LOG = LogFactory.getLog(getClass());
+  private final Log LOG = LogFactory.getLog(getClass());
   private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   private static Path testDir = 
TEST_UTIL.getDataTestDir("TestWALMonotonicallyIncreasingSeqId");
   private WALFactory wals;
   private FileSystem fileSystem;
   private Configuration walConf;
+  private HRegion region;
 
-  public static final String KEY_SEED = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
-
-  private static final int KEY_SEED_LEN = KEY_SEED.length();
-
-  private static final char[] KEY_SEED_CHARS = KEY_SEED.toCharArray();
+  @Parameter
+  public String walProvider;
 
   @Rule
   public TestName name = new TestName();
 
-  private HTableDescriptor getTableDesc(TableName tableName, byte[]... 
families) {
-    HTableDescriptor htd = new HTableDescriptor(tableName);
-    for (byte[] family : families) {
-      HColumnDescriptor hcd = new HColumnDescriptor(family);
-      // Set default to be three versions.
-      hcd.setMaxVersions(Integer.MAX_VALUE);
-      htd.addFamily(hcd);
-    }
-    return htd;
+  @Parameters(name = "{index}: wal={0}")
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { 
"filesystem" });
   }
 
-  private Region initHRegion(HTableDescriptor htd, byte[] startKey, byte[] 
stopKey, int replicaId)
+  private TableDescriptor getTableDesc(TableName tableName, byte[]... 
families) {
+    TableDescriptorBuilder builder = 
TableDescriptorBuilder.newBuilder(tableName);
+    Arrays.stream(families).map(
+      f -> 
ColumnFamilyDescriptorBuilder.newBuilder(f).setMaxVersions(Integer.MAX_VALUE).build())
+        .forEachOrdered(builder::addColumnFamily);
+    return builder.build();
+  }
+
+  private HRegion initHRegion(TableDescriptor htd, byte[] startKey, byte[] 
stopKey, int replicaId)
       throws IOException {
     Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set("hbase.wal.provider", walProvider);
     conf.setBoolean("hbase.hregion.mvcc.preassign", false);
     Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
 
-    HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, 
false, 0, replicaId);
-    fileSystem =  tableDir.getFileSystem(conf);
-    HRegionFileSystem fs = new HRegionFileSystem(conf, fileSystem, tableDir, 
info);
+    RegionInfo info = 
RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKey)
+        .setEndKey(stopKey).setReplicaId(replicaId).setRegionId(0).build();
+    fileSystem = tableDir.getFileSystem(conf);
     final Configuration walConf = new Configuration(conf);
     FSUtils.setRootDir(walConf, tableDir);
     this.walConf = walConf;
     wals = new WALFactory(walConf, null, "log_" + replicaId);
     ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 
0, null);
     HRegion region = HRegion.createHRegion(info, 
TEST_UTIL.getDefaultRootDirPath(), conf, htd,
-        wals.getWAL(info.getEncodedNameAsBytes(), 
info.getTable().getNamespace()));
+      wals.getWAL(info.getEncodedNameAsBytes(), 
info.getTable().getNamespace()));
     return region;
   }
 
   CountDownLatch latch = new CountDownLatch(1);
+
   public class PutThread extends Thread {
     HRegion region;
+
     public PutThread(HRegion region) {
       this.region = region;
     }
@@ -112,102 +135,106 @@ public class TestWALMonotonicallyIncreasingSeqId {
     @Override
     public void run() {
       try {
-        for(int i = 0; i < 100; i++) {
+        for (int i = 0; i < 100; i++) {
           byte[] row = Bytes.toBytes("putRow" + i);
           Put put = new Put(row);
           put.addColumn("cf".getBytes(), Bytes.toBytes(0), Bytes.toBytes(""));
-          //put.setDurability(Durability.ASYNC_WAL);
           latch.await();
-          region.batchMutate(new Mutation[]{put});
+          region.batchMutate(new Mutation[] { put });
           Thread.sleep(10);
         }
 
-
       } catch (Throwable t) {
         LOG.warn("Error happend when Increment: ", t);
       }
-
     }
   }
 
   public class IncThread extends Thread {
     HRegion region;
+
     public IncThread(HRegion region) {
       this.region = region;
     }
+
     @Override
     public void run() {
       try {
-        for(int i = 0; i < 100; i++) {
+        for (int i = 0; i < 100; i++) {
           byte[] row = Bytes.toBytes("incrementRow" + i);
           Increment inc = new Increment(row);
           inc.addColumn("cf".getBytes(), Bytes.toBytes(0), 1);
-          //inc.setDurability(Durability.ASYNC_WAL);
+          // inc.setDurability(Durability.ASYNC_WAL);
           region.increment(inc);
           latch.countDown();
           Thread.sleep(10);
         }
 
-
       } catch (Throwable t) {
         LOG.warn("Error happend when Put: ", t);
       }
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    byte[][] families = new byte[][] { Bytes.toBytes("cf") };
+    TableDescriptor htd = getTableDesc(
+      TableName.valueOf(name.getMethodName().replaceAll("[^0-9A-Za-z_]", 
"_")), families);
+    region = initHRegion(htd, HConstants.EMPTY_START_ROW, 
HConstants.EMPTY_END_ROW, 0);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (region != null) {
+      region.close();
+    }
+  }
 
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException {
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  private WAL.Reader createReader(Path logPath, Path oldWalsDir) throws 
IOException {
+    try {
+      return wals.createReader(fileSystem, logPath);
+    } catch (IOException e) {
+      return wals.createReader(fileSystem, new Path(oldWalsDir, 
logPath.getName()));
     }
   }
 
   @Test
-  public void TestWALMonotonicallyIncreasingSeqId() throws Exception {
-    byte[][] families = new byte[][] {Bytes.toBytes("cf")};
-    byte[] qf = Bytes.toBytes("cq");
-    HTableDescriptor htd = 
getTableDesc(TableName.valueOf(name.getMethodName()), families);
-    HRegion region = (HRegion)initHRegion(htd, HConstants.EMPTY_START_ROW, 
HConstants.EMPTY_END_ROW, 0);
+  public void testWALMonotonicallyIncreasingSeqId() throws Exception {
     List<Thread> putThreads = new ArrayList<>();
-    for(int i = 0; i < 1; i++) {
+    for (int i = 0; i < 1; i++) {
       putThreads.add(new PutThread(region));
     }
     IncThread incThread = new IncThread(region);
-    for(int i = 0; i < 1; i++) {
+    for (int i = 0; i < 1; i++) {
       putThreads.get(i).start();
     }
     incThread.start();
     incThread.join();
 
-    Path logPath = ((FSHLog) region.getWAL()).getCurrentFileName();
+    Path logPath = ((AbstractFSWAL<?>) region.getWAL()).getCurrentFileName();
     region.getWAL().rollWriter();
     Thread.sleep(10);
     Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR));
     Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    WAL.Reader reader = null;
-    try {
-      reader = wals.createReader(fileSystem, logPath);
-    } catch (Throwable t) {
-      reader = wals.createReader(fileSystem, new Path(oldWalsDir, 
logPath.getName()));
-
-    }
-    WAL.Entry e;
-    try {
+    try (WAL.Reader reader = createReader(logPath, oldWalsDir)) {
       long currentMaxSeqid = 0;
-      while ((e = reader.next()) != null) {
+      for (WAL.Entry e; (e = reader.next()) != null;) {
         if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
           long currentSeqid = e.getKey().getSequenceId();
-          if(currentSeqid > currentMaxSeqid) {
+          if (currentSeqid > currentMaxSeqid) {
             currentMaxSeqid = currentSeqid;
           } else {
-            Assert.fail("Current max Seqid is " + currentMaxSeqid
-                + ", but the next seqid in wal is smaller:" + currentSeqid);
+            fail("Current max Seqid is " + currentMaxSeqid +
+              ", but the next seqid in wal is smaller:" + currentSeqid);
           }
         }
       }
-    } finally {
-      if(reader != null) {
-        reader.close();
-      }
-      if(region != null) {
-        region.close();
-      }
     }
   }
-
-
 }

Reply via email to