Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 88512be52 -> 06c3dec2d


HBASE-16721 Concurrency issue in WAL unflushed seqId tracking

Conflicts:
        
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
        
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/06c3dec2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/06c3dec2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/06c3dec2

Branch: refs/heads/branch-1.1
Commit: 06c3dec2da32dcb588f0eb31e5db87796668bd39
Parents: 88512be
Author: Enis Soztutar <e...@apache.org>
Authored: Thu Sep 29 13:50:58 2016 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Thu Sep 29 14:42:25 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  19 ++--
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |   2 +-
 .../hbase/regionserver/wal/TestFSHLog.java      | 101 ++++++++++++++++++-
 3 files changed, 110 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/06c3dec2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f033177..cc89b84 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2199,6 +2199,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     try {
       try {
         writeEntry = mvcc.beginMemstoreInsert();
+        // wait for all in-progress transactions to commit to WAL before       
                                                                                
                                               
+        // we can start the flush. This prevents
+        // uncommitted transactions from being written into HFiles.
+        // We have to block before we start the flush, otherwise keys that
+        // were removed via a rollbackMemstore could be written to Hfiles.
+        mvcc.waitForPreviousTransactionsComplete(writeEntry);
+        // set w to null to prevent mvcc.advanceMemstore from being called 
again inside finally block
+        writeEntry = null;
+
         if (wal != null) {
           Long earliestUnflushedSequenceIdForTheRegion =
               wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
@@ -2275,16 +2284,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           throw ioe;
         }
       }
-
-      // wait for all in-progress transactions to commit to WAL before
-      // we can start the flush. This prevents
-      // uncommitted transactions from being written into HFiles.
-      // We have to block before we start the flush, otherwise keys that
-      // were removed via a rollbackMemstore could be written to Hfiles.
-      writeEntry.setWriteNumber(flushOpSeqId);
-      mvcc.waitForPreviousTransactionsComplete(writeEntry);
-      // set w to null to prevent mvcc.advanceMemstore from being called again 
inside finally block
-      writeEntry = null;
     } finally {
       if (writeEntry != null) {
         // in case of failure just mark current writeEntry as complete

http://git-wip-us.apache.org/repos/asf/hbase/blob/06c3dec2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 473bba9..20d0834 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -50,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface WAL {
+public interface WAL extends AutoCloseable {
 
   /**
    * Registers WALActionsListener

http://git-wip-us.apache.org/repos/asf/hbase/blob/06c3dec2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index d14107a..1689778 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -33,7 +33,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
@@ -59,6 +63,8 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -389,8 +395,8 @@ public class TestFSHLog {
    * by slowing appends in the background ring buffer thread while in 
foreground we call
    * flush.  The addition of the sync over HRegion in flush should fix an 
issue where flush was
    * returning before all of its appends had made it out to the WAL 
(HBASE-11109).
+   * see HBASE-11109
    * @throws IOException
-   * @see HBASE-11109
    */
   @Test
   public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws 
IOException {
@@ -494,4 +500,97 @@ public class TestFSHLog {
       log.close();
     }
   }
+
+  /**
+   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
+   */
+  @Test (timeout = 30000)
+  public void testUnflushedSeqIdTracking() throws IOException, 
InterruptedException {
+    final String name = "testSyncRunnerIndexOverflow";
+    final byte[] b = Bytes.toBytes("b");
+
+    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+    final CountDownLatch holdAppend = new CountDownLatch(1);
+    final CountDownLatch flushFinished = new CountDownLatch(1);
+    final CountDownLatch putFinished = new CountDownLatch(1);
+
+    try (FSHLog log =
+        new FSHLog(fs, FSUtils.getRootDir(conf), name, 
HConstants.HREGION_OLDLOGDIR_NAME, conf,
+            null, true, null, null)) {
+
+      log.registerWALActionsListener(new WALActionsListener.Base() {
+        @Override
+        public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey 
logKey, WALEdit logEdit) {
+          if (startHoldingForAppend.get()) {
+            try {
+              holdAppend.await();
+            } catch (InterruptedException e) {
+              LOG.error(e);
+            }
+          }
+        }
+      });
+
+      // open a new region which uses this WAL
+      HTableDescriptor htd =
+          new HTableDescriptor(TableName.valueOf("t1")).addFamily(new 
HColumnDescriptor(b));
+      HRegionInfo hri =
+          new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, 
HConstants.EMPTY_END_ROW);
+
+      final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
+      ExecutorService exec = Executors.newFixedThreadPool(2);
+
+      // do a regular write first because of memstore size calculation.
+      region.put(new Put(b).addColumn(b, b,b));
+
+      startHoldingForAppend.set(true);
+      exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            region.put(new Put(b).addColumn(b, b,b));
+            putFinished.countDown();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      });
+
+      // give the put a chance to start
+      Threads.sleep(3000);
+
+      exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            Region.FlushResult flushResult = region.flush(true);
+            LOG.info("Flush result:" +  flushResult.getResult());
+            LOG.info("Flush succeeded:" +  flushResult.isFlushSucceeded());
+            flushFinished.countDown();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      });
+
+      // give the flush a chance to start. Flush should have got the region 
lock, and
+      // should have been waiting on the mvcc complete after this.
+      Threads.sleep(3000);
+
+      // let the append to WAL go through now that the flush already started
+      holdAppend.countDown();
+      putFinished.await();
+      flushFinished.await();
+
+      // check whether flush went through
+      assertEquals("Region did not flush?", 1, region.getStoreFileList(new 
byte[][]{b}).size());
+
+      // now check the region's unflushed seqIds.
+      long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
+      assertEquals("Found seqId for the region which is already flushed",
+          HConstants.NO_SEQNUM, seqId);
+
+      region.close();
+    }
+  }
 }

Reply via email to