Repository: hbase Updated Branches: refs/heads/branch-1.1 88512be52 -> 06c3dec2d
HBASE-16721 Concurrency issue in WAL unflushed seqId tracking Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/06c3dec2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/06c3dec2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/06c3dec2 Branch: refs/heads/branch-1.1 Commit: 06c3dec2da32dcb588f0eb31e5db87796668bd39 Parents: 88512be Author: Enis Soztutar <e...@apache.org> Authored: Thu Sep 29 13:50:58 2016 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Thu Sep 29 14:42:25 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 19 ++-- .../java/org/apache/hadoop/hbase/wal/WAL.java | 2 +- .../hbase/regionserver/wal/TestFSHLog.java | 101 ++++++++++++++++++- 3 files changed, 110 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/06c3dec2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f033177..cc89b84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2199,6 +2199,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { try { writeEntry = mvcc.beginMemstoreInsert(); + // wait for all in-progress transactions to commit to WAL before + // we can start the flush. This prevents + // uncommitted transactions from being written into HFiles. + // We have to block before we start the flush, otherwise keys that + // were removed via a rollbackMemstore could be written to Hfiles. + mvcc.waitForPreviousTransactionsComplete(writeEntry); + // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block + writeEntry = null; + if (wal != null) { Long earliestUnflushedSequenceIdForTheRegion = wal.startCacheFlush(encodedRegionName, flushedFamilyNames); @@ -2275,16 +2284,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw ioe; } } - - // wait for all in-progress transactions to commit to WAL before - // we can start the flush. This prevents - // uncommitted transactions from being written into HFiles. - // We have to block before we start the flush, otherwise keys that - // were removed via a rollbackMemstore could be written to Hfiles. - writeEntry.setWriteNumber(flushOpSeqId); - mvcc.waitForPreviousTransactionsComplete(writeEntry); - // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block - writeEntry = null; } finally { if (writeEntry != null) { // in case of failure just mark current writeEntry as complete http://git-wip-us.apache.org/repos/asf/hbase/blob/06c3dec2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 473bba9..20d0834 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -50,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface WAL { +public interface WAL extends AutoCloseable { /** * Registers WALActionsListener http://git-wip-us.apache.org/repos/asf/hbase/blob/06c3dec2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index d14107a..1689778 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -33,7 +33,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; @@ -59,6 +63,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -389,8 +395,8 @@ public class TestFSHLog { * by slowing appends in the background ring buffer thread while in foreground we call * flush. The addition of the sync over HRegion in flush should fix an issue where flush was * returning before all of its appends had made it out to the WAL (HBASE-11109). + * see HBASE-11109 * @throws IOException - * @see HBASE-11109 */ @Test public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException { @@ -494,4 +500,97 @@ public class TestFSHLog { log.close(); } } + + /** + * Test case for https://issues.apache.org/jira/browse/HBASE-16721 + */ + @Test (timeout = 30000) + public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { + final String name = "testSyncRunnerIndexOverflow"; + final byte[] b = Bytes.toBytes("b"); + + final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + final CountDownLatch holdAppend = new CountDownLatch(1); + final CountDownLatch flushFinished = new CountDownLatch(1); + final CountDownLatch putFinished = new CountDownLatch(1); + + try (FSHLog log = + new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf, + null, true, null, null)) { + + log.registerWALActionsListener(new WALActionsListener.Base() { + @Override + public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { + if (startHoldingForAppend.get()) { + try { + holdAppend.await(); + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + }); + + // open a new region which uses this WAL + HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b)); + HRegionInfo hri = + new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + + final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log); + ExecutorService exec = Executors.newFixedThreadPool(2); + + // do a regular write first because of memstore size calculation. + region.put(new Put(b).addColumn(b, b,b)); + + startHoldingForAppend.set(true); + exec.submit(new Runnable() { + @Override + public void run() { + try { + region.put(new Put(b).addColumn(b, b,b)); + putFinished.countDown(); + } catch (IOException e) { + LOG.error(e); + } + } + }); + + // give the put a chance to start + Threads.sleep(3000); + + exec.submit(new Runnable() { + @Override + public void run() { + try { + Region.FlushResult flushResult = region.flush(true); + LOG.info("Flush result:" + flushResult.getResult()); + LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); + flushFinished.countDown(); + } catch (IOException e) { + LOG.error(e); + } + } + }); + + // give the flush a chance to start. Flush should have got the region lock, and + // should have been waiting on the mvcc complete after this. + Threads.sleep(3000); + + // let the append to WAL go through now that the flush already started + holdAppend.countDown(); + putFinished.await(); + flushFinished.await(); + + // check whether flush went through + assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size()); + + // now check the region's unflushed seqIds. + long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes()); + assertEquals("Found seqId for the region which is already flushed", + HConstants.NO_SEQNUM, seqId); + + region.close(); + } + } }