Repository: hbase
Updated Branches:
  refs/heads/master 1b12a6039 -> 988d1f9bc


http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 3405b49..277eb48 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -121,7 +123,7 @@ public class TestWalAndCompactingMemStoreFlush {
   }
 
   @Test(timeout = 180000)
-  public void testSelectiveFlushWhenEnabled() throws IOException {
+  public void testSelectiveFlushWithDataCompaction() throws IOException {
 
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
@@ -130,9 +132,11 @@ public class TestWalAndCompactingMemStoreFlush {
         FlushNonSloppyStoresFirstPolicy.class.getName());
     
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
 200 * 1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.25);
+    // set memstore to do data compaction
+    conf.set("hbase.hregion.compacting.memstore.type", "data-compaction");
 
     // Intialize the region
-    Region region = initHRegion("testSelectiveFlushWhenEnabled", conf);
+    Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
 
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
     for (int i = 1; i <= 1200; i++) {
@@ -313,7 +317,7 @@ public class TestWalAndCompactingMemStoreFlush {
     assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
 
     // CF3 should be bottleneck for WAL
-     assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, 
smallestSeqCF3PhaseIV);
+    assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, 
smallestSeqCF3PhaseIV);
 
     // Flush!!!!!!!!!!!!!!!!!!!!!!
     // Trying to clean the existing memstores, CF2 all flushed to disk. The 
single
@@ -330,7 +334,7 @@ public class TestWalAndCompactingMemStoreFlush {
         
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
     assertTrue(
-        CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD < 
cf1MemstoreSizePhaseV);
+        CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD <= 
cf1MemstoreSizePhaseV);
     assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
         cf2MemstoreSizePhaseV);
     assertEquals(CompactingMemStore.DEEP_OVERHEAD + 
MutableSegment.DEEP_OVERHEAD,
@@ -371,9 +375,13 @@ public class TestWalAndCompactingMemStoreFlush {
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
+  
/*------------------------------------------------------------------------------*/
+  /* Check the same as above but for index-compaction type of compacting 
memstore */
   @Test(timeout = 180000)
-  public void testSelectiveFlushWhenEnabledNoFlattening() throws IOException {
+  public void testSelectiveFlushWithIndexCompaction() throws IOException {
 
+    
/*------------------------------------------------------------------------------*/
+    /* SETUP */
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
@@ -381,18 +389,17 @@ public class TestWalAndCompactingMemStoreFlush {
         FlushNonSloppyStoresFirstPolicy.class.getName());
     
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
 200 * 1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.5);
+    // set memstore to index-compaction
+    conf.set("hbase.hregion.compacting.memstore.type", "index-compaction");
 
-    // set memstore segment flattening to false and compact to skip-list
-    conf.setBoolean("hbase.hregion.compacting.memstore.flatten", false);
-    conf.setInt("hbase.hregion.compacting.memstore.type",1);
-
-    // Intialize the region
-    Region region = initHRegion("testSelectiveFlushWhenEnabled", conf);
+    // Initialize the region
+    Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
 
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE I - insertions */
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
     for (int i = 1; i <= 1200; i++) {
       region.put(createPut(1, i));        // compacted memstore
-
       if (i <= 100) {
         region.put(createPut(2, i));
         if (i <= 50) {
@@ -400,41 +407,32 @@ public class TestWalAndCompactingMemStoreFlush {
         }
       }
     }
-
     // Now add more puts for CF2, so that we only flush CF2 to disk
     for (int i = 100; i < 2000; i++) {
       region.put(createPut(2, i));
     }
 
-    long totalMemstoreSize = region.getMemstoreSize();
-
+    
/*------------------------------------------------------------------------------*/
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE I - collect sizes */
+    long totalMemstoreSizePhaseI = region.getMemstoreSize();
     // Find the smallest LSNs for edits wrt to each CF.
     long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
     long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
     long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
-
     // Find the sizes of the memstores of each CF.
     long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
     long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
     long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
-
     // Get the overall smallest LSN in the region's memstores.
     long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
         
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
-    String s = "\n\n----------------------------------\n"
-        + "Upon initial insert and before any flush, size of CF1 is:"
-        + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
-        + region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:"
-        + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
-        + region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:"
-        + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
-        + region.getStore(FAMILY3).isSloppyMemstore() + "\n";
-
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE I - validation */
     // The overall smallest LSN in the region's memstores should be the same as
     // the LSN of the smallest edit in CF1
     assertEquals(smallestSeqCF1PhaseI, 
smallestSeqInRegionCurrentMemstorePhaseI);
-
     // Some other sanity checks.
     assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
     assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
@@ -444,149 +442,169 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    String msg = "totalMemstoreSize="+totalMemstoreSize +
-        " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
-        " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
-        " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
-        " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
-    assertEquals(msg,
-        totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + 
MutableSegment.DEEP_OVERHEAD)
-            + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
+    assertEquals(
+        totalMemstoreSizePhaseI
+            + 1 * DefaultMemStore.DEEP_OVERHEAD
+            + 2 * CompactingMemStore.DEEP_OVERHEAD
+            + 3 * MutableSegment.DEEP_OVERHEAD,
         cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
 
-    // Flush!!!!!!!!!!!!!!!!!!!!!!
-    // We have big compacting memstore CF1 and two small memstores:
-    // CF2 (not compacted) and CF3 (compacting)
-    // All together they are above the flush size lower bound.
-    // Since CF1 and CF3 should be flushed to memory (not to disk),
-    // CF2 is going to be flushed to disk.
-    // CF1 - nothing to compact, CF3 - should be twice compacted
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE I - Flush */
+    // First Flush in Test!!!!!!!!!!!!!!!!!!!!!!
+    // CF1, CF2, CF3, all together they are above the flush size lower bound.
+    // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk.
+    // CF1 and CF3 - flushed to memory and flatten explicitly
+    region.flush(false);
     CompactingMemStore cms1 = (CompactingMemStore) ((HStore) 
region.getStore(FAMILY1)).memstore;
     CompactingMemStore cms3 = (CompactingMemStore) ((HStore) 
region.getStore(FAMILY3)).memstore;
     cms1.flushInMemory();
     cms3.flushInMemory();
-    region.flush(false);
 
+    // CF3/CF1 should be merged so wait here to be sure the compaction is done
+    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+
+    
/*------------------------------------------------------------------------------*/
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE II - collect sizes */
     // Recalculate everything
     long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
     long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
     long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
-
     long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
         
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     // Find the smallest LSNs for edits wrt to each CF.
-    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
-    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
     long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
+    long totalMemstoreSizePhaseII = region.getMemstoreSize();
 
-    s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD
-        + ", CompactingMemStore DEEP_OVERHEAD is:" + 
CompactingMemStore.DEEP_OVERHEAD
-        + "\n----After first flush! CF1 should be flushed to memory, but not 
compacted.---\n"
-        + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + 
cf2MemstoreSizePhaseII
-        + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
-
-    // CF1 was flushed to memory, but there is nothing to compact, should
-    // remain the same size plus renewed empty skip-list
-    assertEquals(s, cf1MemstoreSizePhaseII, cf1MemstoreSizePhaseI
-        + ImmutableSegment.DEEP_OVERHEAD_CAM + 
CompactionPipeline.ENTRY_OVERHEAD);
-
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE II - validation */
+    // CF1 was flushed to memory, should be flattened and take less space
+    assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI);
     // CF2 should become empty
     assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
         cf2MemstoreSizePhaseII);
-
-    // verify that CF3 was flushed to memory and was compacted (this is 
approximation check)
-    assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD
-        + ImmutableSegment.DEEP_OVERHEAD_CAM
-        + CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII);
-    assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII);
-
+    // verify that CF3 was flushed to memory and was not compacted (this is an 
approximation check)
+    // if compacted CF# should be at least twice less because its every key 
was duplicated
+    assertTrue(cf3MemstoreSizePhaseI / 2 < cf3MemstoreSizePhaseII);
 
     // Now the smallest LSN in the region should be the same as the smallest
     // LSN in the memstore of CF1.
     assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, 
smallestSeqCF1PhaseI);
-
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3. Counting the empty active segments in 
CF1/2/3 and pipeline
+    // items in CF1/2
+    assertEquals(
+        totalMemstoreSizePhaseII
+            + 1 * DefaultMemStore.DEEP_OVERHEAD
+            + 2 * CompactingMemStore.DEEP_OVERHEAD
+            + 3 * MutableSegment.DEEP_OVERHEAD
+            + 2 * CompactionPipeline.ENTRY_OVERHEAD
+            + 2 * ImmutableSegment.DEEP_OVERHEAD_CAM,
+        cf1MemstoreSizePhaseII + cf2MemstoreSizePhaseII + 
cf3MemstoreSizePhaseII);
+
+    
/*------------------------------------------------------------------------------*/
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE III - insertions */
     // Now add more puts for CF1, so that we also flush CF1 to disk instead of
-    // memory in next flush
-    for (int i = 1200; i < 2000; i++) {
+    // memory in next flush. This is causing the CF! to be flushed to memory 
twice.
+    for (int i = 1200; i < 8000; i++) {
       region.put(createPut(1, i));
     }
 
-    s = s + "The smallest sequence in region WAL is: " + 
smallestSeqInRegionCurrentMemstorePhaseII
-        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " +
-        "the smallest sequence in CF2:"
-        + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + 
smallestSeqCF3PhaseII + "\n";
+    // CF1 should be flatten and merged so wait here to be sure the compaction 
is done
+    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
 
-    // How much does the CF1 memstore occupy? Will be used later.
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE III - collect sizes */
+    // How much does the CF1 memstore occupy now? Will be used later.
     long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
-    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
-
-    s = s + "----After more puts into CF1 its size is:" + 
cf1MemstoreSizePhaseIII
-        + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ;
+    long totalMemstoreSizePhaseIII = region.getMemstoreSize();
 
-
-    // Flush!!!!!!!!!!!!!!!!!!!!!!
-    // Flush again, CF1 is flushed to disk
-    // CF2 is flushed to disk, because it is not in-memory compacted memstore
-    // CF3 is flushed empty to memory (actually nothing happens to CF3)
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE III - validation */
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3. Counting the empty active segments in 
CF1/2/3 and pipeline
+    // items in CF1/2
+    assertEquals(
+        totalMemstoreSizePhaseIII
+            + 1 * DefaultMemStore.DEEP_OVERHEAD
+            + 2 * CompactingMemStore.DEEP_OVERHEAD
+            + 3 * MutableSegment.DEEP_OVERHEAD
+            + 2 * CompactionPipeline.ENTRY_OVERHEAD
+            + 2 * ImmutableSegment.DEEP_OVERHEAD_CAM,
+        cf1MemstoreSizePhaseIII + cf2MemstoreSizePhaseII + 
cf3MemstoreSizePhaseII);
+
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE III - Flush */
+    // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!!
+    // CF1 is flushed to disk, but not entirely emptied.
+    // CF2 was and remained empty, same way nothing happens to CF3
     region.flush(false);
 
+    
/*------------------------------------------------------------------------------*/
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE IV - collect sizes */
     // Recalculate everything
     long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
     long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
     long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
-
     long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
         
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
-    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
-    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
     long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
 
-    s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + 
", CF2 size is:"
-        + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV
-        + "\n";
-
-    s = s + "The smallest sequence in region WAL is: " + 
smallestSeqInRegionCurrentMemstorePhaseIV
-        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
-        "the smallest sequence in CF2:"
-        + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + 
smallestSeqCF3PhaseIV
-        + "\n";
-
-    // CF1's pipeline component (inserted before first flush) should be 
flushed to disk
-    // CF2 should be flushed to disk
-    assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + 
CompactingMemStore.DEEP_OVERHEAD
-        + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV);
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE IV - validation */
+    // CF1's biggest pipeline component (inserted before first flush) should 
be flushed to disk
+    // CF2 should remain empty
+    assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV);
     assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
         cf2MemstoreSizePhaseIV);
-
     // CF3 shouldn't have been touched.
     assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
-
     // the smallest LSN of CF3 shouldn't change
     assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
-
     // CF3 should be bottleneck for WAL
-    assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, 
smallestSeqCF3PhaseIV);
+    assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, 
smallestSeqCF3PhaseIV);
 
-    // Flush!!!!!!!!!!!!!!!!!!!!!!
-    // Clearing the existing memstores, CF2 all flushed to disk. The single
-    // memstore segment in the compaction pipeline of CF1 and CF3 should be 
flushed to disk.
-    // Note that active sets of CF1 and CF3 are empty
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE IV - Flush */
+    // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!!
+    // Force flush to disk on all memstores (flush parameter true).
+    // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are 
empty
     region.flush(true);
 
+    
/*------------------------------------------------------------------------------*/
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE V - collect sizes */
     // Recalculate everything
     long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
     long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
     long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
     long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
         
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long totalMemstoreSizePhaseV = region.getMemstoreSize();
 
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE V - validation */
     assertEquals(CompactingMemStore.DEEP_OVERHEAD + 
MutableSegment.DEEP_OVERHEAD,
         cf1MemstoreSizePhaseV);
     assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
         cf2MemstoreSizePhaseV);
     assertEquals(CompactingMemStore.DEEP_OVERHEAD + 
MutableSegment.DEEP_OVERHEAD,
         cf3MemstoreSizePhaseV);
-
+    // The total memstores size should be empty
+    assertEquals(totalMemstoreSizePhaseV, 0);
     // Because there is nothing in any memstore the WAL's LSN should be -1
     assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, 
HConstants.NO_SEQNUM);
 
@@ -594,6 +612,9 @@ public class TestWalAndCompactingMemStoreFlush {
     // any Column Family above the threshold?
     // In that case, we should flush all the CFs.
 
+    
/*------------------------------------------------------------------------------*/
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE VI - insertions */
     // The memstore limit is 200*1024 and the column family flush threshold is
     // around 50*1024. We try to just hit the memstore limit with each CF's
     // memstore being below the CF flush threshold.
@@ -605,24 +626,32 @@ public class TestWalAndCompactingMemStoreFlush {
       region.put(createPut(5, i));
     }
 
-    region.flush(false);
-
-    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region 
WAL is: "
-        + smallestSeqInRegionCurrentMemstorePhaseV
-        + ". After additional inserts and last flush, the entire region size 
is:" + region
-        .getMemstoreSize()
-        + "\n----------------------------------\n";
+    long cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
+    long cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
+    long cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
 
+    
/*------------------------------------------------------------------------------*/
+    /* PHASE VI - Flush */
+    // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!!
+    // None among compacting memstores was flushed to memory due to previous 
puts.
+    // But is going to be moved to pipeline and flatten due to the flush.
+    region.flush(false);
     // Since we won't find any CF above the threshold, and hence no specific
     // store to flush, we should flush all the memstores
-    // Also compacted memstores are flushed to disk.
-    assertEquals(0, region.getMemstoreSize());
-    System.out.println(s);
+    // Also compacted memstores are flushed to disk, but not entirely emptied
+    long cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
+    long cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
+    long cf5ActiveSizePhaseVII = 
region.getStore(FAMILIES[4]).getMemStoreSize();
+
+    assertTrue(cf1ActiveSizePhaseVII < cf1ActiveSizePhaseVI);
+    assertTrue(cf3ActiveSizePhaseVII < cf3ActiveSizePhaseVI);
+    assertTrue(cf5ActiveSizePhaseVII < cf5ActiveSizePhaseVI);
+
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
   @Test(timeout = 180000)
-  public void testSelectiveFlushWhenEnabledAndWALinCompaction() throws 
IOException {
+  public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
@@ -631,9 +660,11 @@ public class TestWalAndCompactingMemStoreFlush {
     
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
 200 *
         1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.5);
+    // set memstore to do data compaction and not to use the speculative scan
+    conf.set("hbase.hregion.compacting.memstore.type", "data-compaction");
 
     // Intialize the HRegion
-    HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf);
+    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", 
conf);
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
     for (int i = 1; i <= 1200; i++) {
       region.put(createPut(1, i));
@@ -752,6 +783,253 @@ public class TestWalAndCompactingMemStoreFlush {
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
+  @Test(timeout = 180000)
+  public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
+    // Set up the configuration
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+        FlushNonSloppyStoresFirstPolicy.class.getName());
+    
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
+        200 * 1024);
+    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.5);
+    // set memstore to do data compaction and not to use the speculative scan
+    conf.set("hbase.hregion.compacting.memstore.type", "index-compaction");
+
+    // Intialize the HRegion
+    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", 
conf);
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          region.put(createPut(3, i));
+        }
+      }
+    }
+    // Now add more puts for CF2, so that we only flush CF2 to disk
+    for (int i = 100; i < 2000; i++) {
+      region.put(createPut(2, i));
+    }
+
+    long totalMemstoreSize = region.getMemstoreSize();
+
+    // Find the sizes of the memstores of each CF.
+    long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Some other sanity checks.
+    assertTrue(cf1MemstoreSizePhaseI > 0);
+    assertTrue(cf2MemstoreSizePhaseI > 0);
+    assertTrue(cf3MemstoreSizePhaseI > 0);
+
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3.
+    assertEquals(
+        totalMemstoreSize
+            + 1 * DefaultMemStore.DEEP_OVERHEAD
+            + 2 * CompactingMemStore.DEEP_OVERHEAD
+            + 3 * MutableSegment.DEEP_OVERHEAD,
+        cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
+
+    // Flush!
+    ((CompactingMemStore) 
((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
+    ((CompactingMemStore) 
((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
+    // CF1 and CF3 should be compacted so wait here to be sure the compaction 
is done
+    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    region.flush(false);
+
+    long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
+
+    long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL()
+        
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
+
+    // CF2 should have been cleared
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+        cf2MemstoreSizePhaseII);
+
+    // Add same entries to compact them later
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          region.put(createPut(3, i));
+        }
+      }
+    }
+    // Now add more puts for CF2, so that we only flush CF2 to disk
+    for (int i = 100; i < 2000; i++) {
+      region.put(createPut(2, i));
+    }
+
+    long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL()
+        
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
+
+    // Flush!
+    ((CompactingMemStore) 
((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
+    ((CompactingMemStore) 
((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
+    // CF1 and CF3 should be compacted so wait here to be sure the compaction 
is done
+    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    region.flush(false);
+
+    long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL()
+        
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
+
+    // now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT 
progressed due to merge
+    assertFalse(
+        smallestSeqInRegionCurrentMemstorePhaseIV > 
smallestSeqInRegionCurrentMemstorePhaseIII);
+    assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
+    assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
+
+    HBaseTestingUtility.closeRegionAndWAL(region);
+  }
+
+  // should end in 300 seconds (5 minutes)
+  @Test(timeout = 300000)
+  public void testStressFlushAndWALinIndexCompaction() throws IOException {
+    // Set up the configuration
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+        FlushNonSloppyStoresFirstPolicy.class.getName());
+    
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
+        200 * 1024);
+    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.5);
+    // set memstore to do data compaction and not to use the speculative scan
+    conf.set("hbase.hregion.compacting.memstore.type", "index-compaction");
+
+    // Successfully initialize the HRegion
+    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", 
conf);
+
+    Thread[] threads = new Thread[25];
+    for (int i = 0; i < threads.length; i++) {
+      int id = i * 10000;
+      ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id);
+      threads[i] = new Thread(runnable);
+      threads[i].start();
+    }
+    Threads.sleep(10000); // let other threads start
+    region.flush(true); // enforce flush of everything TO DISK while there are 
still ongoing puts
+    Threads.sleep(10000); // let other threads continue
+    region.flush(true); // enforce flush of everything TO DISK while there are 
still ongoing puts
+
+    ((CompactingMemStore) 
((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
+    ((CompactingMemStore) 
((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
+    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
+        .isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+
+    for (int i = 0; i < threads.length; i++) {
+      try {
+        threads[i].join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * The in-memory-flusher thread performs the flush asynchronously. There is 
at most one thread per
+   * memstore instance. It takes the updatesLock exclusively, pushes active 
into the pipeline,
+   * releases updatesLock and compacts the pipeline.
+   */
+  private class ConcurrentPutRunnable implements Runnable {
+    private final HRegion stressedRegion;
+    private final int startNumber;
+
+    ConcurrentPutRunnable(HRegion r, int i) {
+      this.stressedRegion = r;
+      this.startNumber = i;
+    }
+
+    @Override
+    public void run() {
+
+      try {
+        int dummy = startNumber / 10000;
+        System.out.print("Thread " + dummy + " with start number " + 
startNumber + " starts\n");
+        // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+        for (int i = startNumber; i <= startNumber + 3000; i++) {
+          stressedRegion.put(createPut(1, i));
+          if (i <= startNumber + 2000) {
+            stressedRegion.put(createPut(2, i));
+            if (i <= startNumber + 1000) {
+              stressedRegion.put(createPut(3, i));
+            }
+          }
+        }
+        System.out.print("Thread with start number " + startNumber + " 
continues to more puts\n");
+        // Now add more puts for CF2, so that we only flush CF2 to disk
+        for (int i = startNumber + 3000; i < startNumber + 5000; i++) {
+          stressedRegion.put(createPut(2, i));
+        }
+        // And add more puts for CF1
+        for (int i = startNumber + 5000; i < startNumber + 7000; i++) {
+          stressedRegion.put(createPut(1, i));
+        }
+        System.out.print("Thread with start number " + startNumber + " 
flushes\n");
+        // flush (IN MEMORY) one of the stores (each thread flushes different 
store)
+        // and wait till the flush and the following action are done
+        if (startNumber == 0) {
+          ((CompactingMemStore) ((HStore) 
stressedRegion.getStore(FAMILY1)).memstore)
+              .flushInMemory();
+          while (((CompactingMemStore) ((HStore) 
stressedRegion.getStore(FAMILY1)).memstore)
+              .isMemStoreFlushingInMemory()) {
+            Threads.sleep(10);
+          }
+        }
+        if (startNumber == 10000) {
+          ((CompactingMemStore) ((HStore) 
stressedRegion.getStore(FAMILY2)).memstore).flushInMemory();
+          while (((CompactingMemStore) ((HStore) 
stressedRegion.getStore(FAMILY2)).memstore)
+              .isMemStoreFlushingInMemory()) {
+            Threads.sleep(10);
+          }
+        }
+        if (startNumber == 20000) {
+          ((CompactingMemStore) ((HStore) 
stressedRegion.getStore(FAMILY3)).memstore).flushInMemory();
+          while (((CompactingMemStore) ((HStore) 
stressedRegion.getStore(FAMILY3)).memstore)
+              .isMemStoreFlushingInMemory()) {
+            Threads.sleep(10);
+          }
+        }
+        System.out.print("Thread with start number " + startNumber + " 
finishes\n");
+      } catch (IOException e) {
+        assert false;
+      }
+    }
+  }
+
   private WAL getWAL(Region region) {
     return ((HRegion)region).getWAL();
   }

Reply via email to