Repository: hbase
Updated Branches:
  refs/heads/branch-2 9329a18c2 -> 9c8c749cd


HBASE-18158 Two running in-memory compaction threads may lose data for flushing


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9c8c749c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9c8c749c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9c8c749c

Branch: refs/heads/branch-2
Commit: 9c8c749cd35bea3df2d7dd1fe857ec6452e4e68a
Parents: 9329a18
Author: Chia-Ping Tsai <chia7...@gmail.com>
Authored: Wed Jun 7 14:10:48 2017 +0800
Committer: Chia-Ping Tsai <chia7...@gmail.com>
Committed: Wed Jun 7 18:04:30 2017 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  |  11 +-
 .../hadoop/hbase/regionserver/TestStore.java    | 101 +++++++++++++++++++
 2 files changed, 109 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9c8c749c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 8d71efc..5b9372a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -94,10 +94,15 @@ public class CompactingMemStore extends AbstractMemStore {
     this.store = store;
     this.regionServices = regionServices;
     this.pipeline = new CompactionPipeline(getRegionServices());
-    this.compactor = new MemStoreCompactor(this, compactionPolicy);
+    this.compactor = createMemStoreCompactor(compactionPolicy);
     initInmemoryFlushSize(conf);
   }
 
+  @VisibleForTesting
+  protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy 
compactionPolicy) {
+    return new MemStoreCompactor(this, compactionPolicy);
+  }
+
   private void initInmemoryFlushSize(Configuration conf) {
     long memstoreFlushSize = getRegionServices().getMemstoreFlushSize();
     int numStores = getRegionServices().getNumStores();
@@ -410,7 +415,8 @@ public class CompactingMemStore extends AbstractMemStore {
     return getRegionServices().getInMemoryCompactionPool();
   }
 
-  private boolean shouldFlushInMemory() {
+  @VisibleForTesting
+  protected boolean shouldFlushInMemory() {
     if (this.active.keySize() > inmemoryFlushSize) { // size above flush 
threshold
       if (inWalReplay) {  // when replaying edits from WAL there is no need in 
in-memory flush
         return false;     // regardless the size
@@ -430,7 +436,6 @@ public class CompactingMemStore extends AbstractMemStore {
   private void stopCompaction() {
     if (inMemoryFlushInProgress.get()) {
       compactor.stop();
-      inMemoryFlushInProgress.set(false);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9c8c749c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 1946d43..22539c5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -70,6 +71,7 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -102,6 +104,7 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Test class for the Store
@@ -1213,6 +1216,59 @@ public class TestStore {
     }
   }
 
+  /**
+   * If there are two running InMemoryFlushRunnable, the later 
InMemoryFlushRunnable
+   * may change the versionedList. And the first InMemoryFlushRunnable will 
use the chagned
+   * versionedList to remove the corresponding segments.
+   * In short, there will be some segements which isn't in merge are removed.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=30000)
+  public void testRunDoubleMemStoreCompactors() throws IOException, 
InterruptedException {
+    int flushSize = 500;
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(HStore.MEMSTORE_CLASS_NAME, 
MyCompactingMemStoreWithCustomCompactor.class.getName());
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 
String.valueOf(flushSize));
+    // Set the lower threshold to invoke the "MERGE" policy
+    conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 
String.valueOf(0));
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
+    init(name.getMethodName(), conf, hcd);
+    byte[] value = Bytes.toBytes("thisisavarylargevalue");
+    MemstoreSize memStoreSize = new MemstoreSize();
+    long ts = EnvironmentEdgeManager.currentTime();
+    long seqId = 100;
+    // older data whihc shouldn't be "seen" by client
+    store.add(createCell(qf1, ts, seqId, value), memStoreSize);
+    store.add(createCell(qf2, ts, seqId, value), memStoreSize);
+    store.add(createCell(qf3, ts, seqId, value), memStoreSize);
+    assertEquals(1, 
MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+    storeFlushCtx.prepare();
+    // This shouldn't invoke another in-memory flush because the first 
compactor thread
+    // hasn't accomplished the in-memory compaction.
+    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
+    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
+    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
+    assertEquals(1, 
MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
+    //okay. Let the compaction be completed
+    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
+    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
+    while (mem.isMemStoreFlushingInMemory()) {
+      TimeUnit.SECONDS.sleep(1);
+    }
+    // This should invoke another in-memory flush.
+    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
+    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
+    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
+    assertEquals(2, 
MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
+    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+  }
+
   private MyStore initMyStore(String methodName, Configuration conf, 
MyScannerHook hook) throws IOException {
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
     HColumnDescriptor hcd = new HColumnDescriptor(family);
@@ -1242,6 +1298,51 @@ public class TestStore {
     void hook(MyStore store) throws IOException;
   }
 
+  private static class MyMemStoreCompactor extends MemStoreCompactor {
+    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
+    private static final CountDownLatch START_COMPACTOR_LATCH = new 
CountDownLatch(1);
+    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 
MemoryCompactionPolicy compactionPolicy) {
+      super(compactingMemStore, compactionPolicy);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
+      boolean rval = super.start();
+      if (isFirst) {
+        try {
+          START_COMPACTOR_LATCH.await();
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+      return rval;
+    }
+  }
+
+  public static class MyCompactingMemStoreWithCustomCompactor extends 
CompactingMemStore {
+    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
+    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, 
CellComparator c,
+        HStore store, RegionServicesForStores regionServices,
+        MemoryCompactionPolicy compactionPolicy) throws IOException {
+      super(conf, c, store, regionServices, compactionPolicy);
+    }
+
+    @Override
+    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy 
compactionPolicy) {
+      return new MyMemStoreCompactor(this, compactionPolicy);
+    }
+
+    @Override
+    protected boolean shouldFlushInMemory() {
+      boolean rval = super.shouldFlushInMemory();
+      if (rval) {
+        RUNNER_COUNT.incrementAndGet();
+      }
+      return rval;
+    }
+  }
+
   public static class MyCompactingMemStore extends CompactingMemStore {
     private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
     private final CountDownLatch getScannerLatch = new CountDownLatch(1);

Reply via email to