HBASE-16721 Concurrency issue in WAL unflushed seqId tracking

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f77f1530
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f77f1530
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f77f1530

Branch: refs/heads/branch-1.3
Commit: f77f1530d4cebd1679bc1c27782bc283638dbd5f
Parents: 728f58a
Author: Enis Soztutar <e...@apache.org>
Authored: Thu Sep 29 13:50:58 2016 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Thu Sep 29 14:53:29 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  19 ++--
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |   2 +-
 .../hbase/regionserver/wal/TestFSHLog.java      | 101 ++++++++++++++++++-
 3 files changed, 110 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f77f1530/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d43e838..520286f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2293,6 +2293,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
     long trxId = 0;
     MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
+    // wait for all in-progress transactions to commit to WAL before
+    // we can start the flush. This prevents
+    // uncommitted transactions from being written into HFiles.
+    // We have to block before we start the flush, otherwise keys that
+    // were removed via a rollbackMemstore could be written to Hfiles.
+    mvcc.completeAndWait(writeEntry);
+    // set writeEntry to null to prevent mvcc.complete from being called again 
inside finally
+    // block
+    writeEntry = null;
     try {
       try {
         if (wal != null) {
@@ -2371,16 +2380,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           throw ioe;
         }
       }
-
-      // wait for all in-progress transactions to commit to WAL before
-      // we can start the flush. This prevents
-      // uncommitted transactions from being written into HFiles.
-      // We have to block before we start the flush, otherwise keys that
-      // were removed via a rollbackMemstore could be written to Hfiles.
-      mvcc.completeAndWait(writeEntry);
-      // set writeEntry to null to prevent mvcc.complete from being called 
again inside finally
-      // block
-      writeEntry = null;
     } finally {
       if (writeEntry != null) {
         // In case of failure just mark current writeEntry as complete.

http://git-wip-us.apache.org/repos/asf/hbase/blob/f77f1530/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d2b336e..041a5b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -47,7 +47,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface WAL {
+public interface WAL extends AutoCloseable {
 
   /**
    * Registers WALActionsListener

http://git-wip-us.apache.org/repos/asf/hbase/blob/f77f1530/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 6ece700..760cdc1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -30,6 +30,10 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
@@ -55,6 +59,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -343,8 +348,8 @@ public class TestFSHLog {
    * by slowing appends in the background ring buffer thread while in 
foreground we call
    * flush.  The addition of the sync over HRegion in flush should fix an 
issue where flush was
    * returning before all of its appends had made it out to the WAL 
(HBASE-11109).
+   * see HBASE-11109
    * @throws IOException
-   * @see HBASE-11109
    */
   @Test
   public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws 
IOException {
@@ -448,4 +453,98 @@ public class TestFSHLog {
       log.close();
     }
   }
+
+  /**
+   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
+   */
+  @Test (timeout = 30000)
+  public void testUnflushedSeqIdTracking() throws IOException, 
InterruptedException {
+    final String name = "testSyncRunnerIndexOverflow";
+    final byte[] b = Bytes.toBytes("b");
+
+    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+    final CountDownLatch holdAppend = new CountDownLatch(1);
+    final CountDownLatch flushFinished = new CountDownLatch(1);
+    final CountDownLatch putFinished = new CountDownLatch(1);
+
+    try (FSHLog log =
+        new FSHLog(fs, FSUtils.getRootDir(conf), name, 
HConstants.HREGION_OLDLOGDIR_NAME, conf,
+            null, true, null, null)) {
+
+      log.registerWALActionsListener(new WALActionsListener.Base() {
+        @Override
+        public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey 
logKey, WALEdit logEdit)
+            throws IOException {
+          if (startHoldingForAppend.get()) {
+            try {
+              holdAppend.await();
+            } catch (InterruptedException e) {
+              LOG.error(e);
+            }
+          }
+        }
+      });
+
+      // open a new region which uses this WAL
+      HTableDescriptor htd =
+          new HTableDescriptor(TableName.valueOf("t1")).addFamily(new 
HColumnDescriptor(b));
+      HRegionInfo hri =
+          new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, 
HConstants.EMPTY_END_ROW);
+
+      final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
+      ExecutorService exec = Executors.newFixedThreadPool(2);
+
+      // do a regular write first because of memstore size calculation.
+      region.put(new Put(b).addColumn(b, b,b));
+
+      startHoldingForAppend.set(true);
+      exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            region.put(new Put(b).addColumn(b, b,b));
+            putFinished.countDown();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      });
+
+      // give the put a chance to start
+      Threads.sleep(3000);
+
+      exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            Region.FlushResult flushResult = region.flush(true);
+            LOG.info("Flush result:" +  flushResult.getResult());
+            LOG.info("Flush succeeded:" +  flushResult.isFlushSucceeded());
+            flushFinished.countDown();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      });
+
+      // give the flush a chance to start. Flush should have got the region 
lock, and
+      // should have been waiting on the mvcc complete after this.
+      Threads.sleep(3000);
+
+      // let the append to WAL go through now that the flush already started
+      holdAppend.countDown();
+      putFinished.await();
+      flushFinished.await();
+
+      // check whether flush went through
+      assertEquals("Region did not flush?", 1, region.getStoreFileList(new 
byte[][]{b}).size());
+
+      // now check the region's unflushed seqIds.
+      long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
+      assertEquals("Found seqId for the region which is already flushed",
+          HConstants.NO_SEQNUM, seqId);
+
+      region.close();
+    }
+  }
 }

Reply via email to