Repository: hbase Updated Branches: refs/heads/master 1b12a6039 -> 988d1f9bc
http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 3405b49..277eb48 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -36,11 +36,13 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -121,7 +123,7 @@ public class TestWalAndCompactingMemStoreFlush { } @Test(timeout = 180000) - public void testSelectiveFlushWhenEnabled() throws IOException { + public void testSelectiveFlushWithDataCompaction() throws IOException { // Set up the configuration Configuration conf = HBaseConfiguration.create(); @@ -130,9 +132,11 @@ public class TestWalAndCompactingMemStoreFlush { FlushNonSloppyStoresFirstPolicy.class.getName()); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); + // set memstore to do data compaction + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); // Intialize the region - Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); + Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf); // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { @@ -313,7 +317,7 @@ public class TestWalAndCompactingMemStoreFlush { assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); // CF3 should be bottleneck for WAL - assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); + assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); // Flush!!!!!!!!!!!!!!!!!!!!!! // Trying to clean the existing memstores, CF2 all flushed to disk. The single @@ -330,7 +334,7 @@ public class TestWalAndCompactingMemStoreFlush { .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); assertTrue( - CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD < cf1MemstoreSizePhaseV); + CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD <= cf1MemstoreSizePhaseV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, @@ -371,9 +375,13 @@ public class TestWalAndCompactingMemStoreFlush { HBaseTestingUtility.closeRegionAndWAL(region); } + /*------------------------------------------------------------------------------*/ + /* Check the same as above but for index-compaction type of compacting memstore */ @Test(timeout = 180000) - public void testSelectiveFlushWhenEnabledNoFlattening() throws IOException { + public void testSelectiveFlushWithIndexCompaction() throws IOException { + /*------------------------------------------------------------------------------*/ + /* SETUP */ // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); @@ -381,18 +389,17 @@ public class TestWalAndCompactingMemStoreFlush { FlushNonSloppyStoresFirstPolicy.class.getName()); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore to index-compaction + conf.set("hbase.hregion.compacting.memstore.type", "index-compaction"); - // set memstore segment flattening to false and compact to skip-list - conf.setBoolean("hbase.hregion.compacting.memstore.flatten", false); - conf.setInt("hbase.hregion.compacting.memstore.type",1); - - // Intialize the region - Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); + // Initialize the region + Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); + /*------------------------------------------------------------------------------*/ + /* PHASE I - insertions */ // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); // compacted memstore - if (i <= 100) { region.put(createPut(2, i)); if (i <= 50) { @@ -400,41 +407,32 @@ public class TestWalAndCompactingMemStoreFlush { } } } - // Now add more puts for CF2, so that we only flush CF2 to disk for (int i = 100; i < 2000; i++) { region.put(createPut(2, i)); } - long totalMemstoreSize = region.getMemstoreSize(); - + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE I - collect sizes */ + long totalMemstoreSizePhaseI = region.getMemstoreSize(); // Find the smallest LSNs for edits wrt to each CF. long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); - // Find the sizes of the memstores of each CF. long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); - // Get the overall smallest LSN in the region's memstores. long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - String s = "\n\n----------------------------------\n" - + "Upon initial insert and before any flush, size of CF1 is:" - + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:" - + region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:" - + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" - + region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:" - + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:" - + region.getStore(FAMILY3).isSloppyMemstore() + "\n"; - + /*------------------------------------------------------------------------------*/ + /* PHASE I - validation */ // The overall smallest LSN in the region's memstores should be the same as // the LSN of the smallest edit in CF1 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); - // Some other sanity checks. assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); @@ -444,149 +442,169 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. - String msg = "totalMemstoreSize="+totalMemstoreSize + - " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + - " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + - " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; - assertEquals(msg, - totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD) - + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD), + assertEquals( + totalMemstoreSizePhaseI + + 1 * DefaultMemStore.DEEP_OVERHEAD + + 2 * CompactingMemStore.DEEP_OVERHEAD + + 3 * MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); - // Flush!!!!!!!!!!!!!!!!!!!!!! - // We have big compacting memstore CF1 and two small memstores: - // CF2 (not compacted) and CF3 (compacting) - // All together they are above the flush size lower bound. - // Since CF1 and CF3 should be flushed to memory (not to disk), - // CF2 is going to be flushed to disk. - // CF1 - nothing to compact, CF3 - should be twice compacted + /*------------------------------------------------------------------------------*/ + /* PHASE I - Flush */ + // First Flush in Test!!!!!!!!!!!!!!!!!!!!!! + // CF1, CF2, CF3, all together they are above the flush size lower bound. + // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk. + // CF1 and CF3 - flushed to memory and flatten explicitly + region.flush(false); CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; cms1.flushInMemory(); cms3.flushInMemory(); - region.flush(false); + // CF3/CF1 should be merged so wait here to be sure the compaction is done + while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + while (((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE II - collect sizes */ // Recalculate everything long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); - long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); // Find the smallest LSNs for edits wrt to each CF. - long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); - long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + long totalMemstoreSizePhaseII = region.getMemstoreSize(); - s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD - + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD - + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" - + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII - + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; - - // CF1 was flushed to memory, but there is nothing to compact, should - // remain the same size plus renewed empty skip-list - assertEquals(s, cf1MemstoreSizePhaseII, cf1MemstoreSizePhaseI - + ImmutableSegment.DEEP_OVERHEAD_CAM + CompactionPipeline.ENTRY_OVERHEAD); - + /*------------------------------------------------------------------------------*/ + /* PHASE II - validation */ + // CF1 was flushed to memory, should be flattened and take less space + assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI); // CF2 should become empty assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); - - // verify that CF3 was flushed to memory and was compacted (this is approximation check) - assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD - + ImmutableSegment.DEEP_OVERHEAD_CAM - + CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII); - assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII); - + // verify that CF3 was flushed to memory and was not compacted (this is an approximation check) + // if compacted CF# should be at least twice less because its every key was duplicated + assertTrue(cf3MemstoreSizePhaseI / 2 < cf3MemstoreSizePhaseII); // Now the smallest LSN in the region should be the same as the smallest // LSN in the memstore of CF1. assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); - + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline + // items in CF1/2 + assertEquals( + totalMemstoreSizePhaseII + + 1 * DefaultMemStore.DEEP_OVERHEAD + + 2 * CompactingMemStore.DEEP_OVERHEAD + + 3 * MutableSegment.DEEP_OVERHEAD + + 2 * CompactionPipeline.ENTRY_OVERHEAD + + 2 * ImmutableSegment.DEEP_OVERHEAD_CAM, + cf1MemstoreSizePhaseII + cf2MemstoreSizePhaseII + cf3MemstoreSizePhaseII); + + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE III - insertions */ // Now add more puts for CF1, so that we also flush CF1 to disk instead of - // memory in next flush - for (int i = 1200; i < 2000; i++) { + // memory in next flush. This is causing the CF! to be flushed to memory twice. + for (int i = 1200; i < 8000; i++) { region.put(createPut(1, i)); } - s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII - + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " + - "the smallest sequence in CF2:" - + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n"; + // CF1 should be flatten and merged so wait here to be sure the compaction is done + while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } - // How much does the CF1 memstore occupy? Will be used later. + /*------------------------------------------------------------------------------*/ + /* PHASE III - collect sizes */ + // How much does the CF1 memstore occupy now? Will be used later. long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); - long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); - - s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII - + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ; + long totalMemstoreSizePhaseIII = region.getMemstoreSize(); - - // Flush!!!!!!!!!!!!!!!!!!!!!! - // Flush again, CF1 is flushed to disk - // CF2 is flushed to disk, because it is not in-memory compacted memstore - // CF3 is flushed empty to memory (actually nothing happens to CF3) + /*------------------------------------------------------------------------------*/ + /* PHASE III - validation */ + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline + // items in CF1/2 + assertEquals( + totalMemstoreSizePhaseIII + + 1 * DefaultMemStore.DEEP_OVERHEAD + + 2 * CompactingMemStore.DEEP_OVERHEAD + + 3 * MutableSegment.DEEP_OVERHEAD + + 2 * CompactionPipeline.ENTRY_OVERHEAD + + 2 * ImmutableSegment.DEEP_OVERHEAD_CAM, + cf1MemstoreSizePhaseIII + cf2MemstoreSizePhaseII + cf3MemstoreSizePhaseII); + + /*------------------------------------------------------------------------------*/ + /* PHASE III - Flush */ + // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!! + // CF1 is flushed to disk, but not entirely emptied. + // CF2 was and remained empty, same way nothing happens to CF3 region.flush(false); + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE IV - collect sizes */ // Recalculate everything long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); - long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); - long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); - s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" - + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV - + "\n"; - - s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV - + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + - "the smallest sequence in CF2:" - + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV - + "\n"; - - // CF1's pipeline component (inserted before first flush) should be flushed to disk - // CF2 should be flushed to disk - assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD - + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV); + /*------------------------------------------------------------------------------*/ + /* PHASE IV - validation */ + // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk + // CF2 should remain empty + assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); - // CF3 shouldn't have been touched. assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); - // the smallest LSN of CF3 shouldn't change assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); - // CF3 should be bottleneck for WAL - assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); + assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); - // Flush!!!!!!!!!!!!!!!!!!!!!! - // Clearing the existing memstores, CF2 all flushed to disk. The single - // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. - // Note that active sets of CF1 and CF3 are empty + /*------------------------------------------------------------------------------*/ + /* PHASE IV - Flush */ + // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!! + // Force flush to disk on all memstores (flush parameter true). + // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty region.flush(true); + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE V - collect sizes */ // Recalculate everything long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long totalMemstoreSizePhaseV = region.getMemstoreSize(); + /*------------------------------------------------------------------------------*/ + /* PHASE V - validation */ assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); - + // The total memstores size should be empty + assertEquals(totalMemstoreSizePhaseV, 0); // Because there is nothing in any memstore the WAL's LSN should be -1 assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM); @@ -594,6 +612,9 @@ public class TestWalAndCompactingMemStoreFlush { // any Column Family above the threshold? // In that case, we should flush all the CFs. + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE VI - insertions */ // The memstore limit is 200*1024 and the column family flush threshold is // around 50*1024. We try to just hit the memstore limit with each CF's // memstore being below the CF flush threshold. @@ -605,24 +626,32 @@ public class TestWalAndCompactingMemStoreFlush { region.put(createPut(5, i)); } - region.flush(false); - - s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " - + smallestSeqInRegionCurrentMemstorePhaseV - + ". After additional inserts and last flush, the entire region size is:" + region - .getMemstoreSize() - + "\n----------------------------------\n"; + long cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize(); + long cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize(); + long cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize(); + /*------------------------------------------------------------------------------*/ + /* PHASE VI - Flush */ + // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!! + // None among compacting memstores was flushed to memory due to previous puts. + // But is going to be moved to pipeline and flatten due to the flush. + region.flush(false); // Since we won't find any CF above the threshold, and hence no specific // store to flush, we should flush all the memstores - // Also compacted memstores are flushed to disk. - assertEquals(0, region.getMemstoreSize()); - System.out.println(s); + // Also compacted memstores are flushed to disk, but not entirely emptied + long cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize(); + long cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize(); + long cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize(); + + assertTrue(cf1ActiveSizePhaseVII < cf1ActiveSizePhaseVI); + assertTrue(cf3ActiveSizePhaseVII < cf3ActiveSizePhaseVI); + assertTrue(cf5ActiveSizePhaseVII < cf5ActiveSizePhaseVI); + HBaseTestingUtility.closeRegionAndWAL(region); } @Test(timeout = 180000) - public void testSelectiveFlushWhenEnabledAndWALinCompaction() throws IOException { + public void testSelectiveFlushAndWALinDataCompaction() throws IOException { // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); @@ -631,9 +660,11 @@ public class TestWalAndCompactingMemStoreFlush { conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); // Intialize the HRegion - HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf); + HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); @@ -752,6 +783,253 @@ public class TestWalAndCompactingMemStoreFlush { HBaseTestingUtility.closeRegionAndWAL(region); } + @Test(timeout = 180000) + public void testSelectiveFlushAndWALinIndexCompaction() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, + 200 * 1024); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "index-compaction"); + + // Intialize the HRegion + HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + // Now add more puts for CF2, so that we only flush CF2 to disk + for (int i = 100; i < 2000; i++) { + region.put(createPut(2, i)); + } + + long totalMemstoreSize = region.getMemstoreSize(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); + + // Some other sanity checks. + assertTrue(cf1MemstoreSizePhaseI > 0); + assertTrue(cf2MemstoreSizePhaseI > 0); + assertTrue(cf3MemstoreSizePhaseI > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertEquals( + totalMemstoreSize + + 1 * DefaultMemStore.DEEP_OVERHEAD + + 2 * CompactingMemStore.DEEP_OVERHEAD + + 3 * MutableSegment.DEEP_OVERHEAD, + cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); + + // Flush! + ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); + ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); + // CF1 and CF3 should be compacted so wait here to be sure the compaction is done + while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + region.flush(false); + + long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); + + long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL() + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + + // CF2 should have been cleared + assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, + cf2MemstoreSizePhaseII); + + // Add same entries to compact them later + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + // Now add more puts for CF2, so that we only flush CF2 to disk + for (int i = 100; i < 2000; i++) { + region.put(createPut(2, i)); + } + + long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL() + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); + + // Flush! + ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); + ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); + // CF1 and CF3 should be compacted so wait here to be sure the compaction is done + while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + region.flush(false); + + long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL() + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); + + // now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge + assertFalse( + smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); + assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); + assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); + + HBaseTestingUtility.closeRegionAndWAL(region); + } + + // should end in 300 seconds (5 minutes) + @Test(timeout = 300000) + public void testStressFlushAndWALinIndexCompaction() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, + 200 * 1024); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "index-compaction"); + + // Successfully initialize the HRegion + HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); + + Thread[] threads = new Thread[25]; + for (int i = 0; i < threads.length; i++) { + int id = i * 10000; + ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id); + threads[i] = new Thread(runnable); + threads[i].start(); + } + Threads.sleep(10000); // let other threads start + region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts + Threads.sleep(10000); // let other threads continue + region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts + + ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); + ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); + while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + + for (int i = 0; i < threads.length; i++) { + try { + threads[i].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per + * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline, + * releases updatesLock and compacts the pipeline. + */ + private class ConcurrentPutRunnable implements Runnable { + private final HRegion stressedRegion; + private final int startNumber; + + ConcurrentPutRunnable(HRegion r, int i) { + this.stressedRegion = r; + this.startNumber = i; + } + + @Override + public void run() { + + try { + int dummy = startNumber / 10000; + System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n"); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = startNumber; i <= startNumber + 3000; i++) { + stressedRegion.put(createPut(1, i)); + if (i <= startNumber + 2000) { + stressedRegion.put(createPut(2, i)); + if (i <= startNumber + 1000) { + stressedRegion.put(createPut(3, i)); + } + } + } + System.out.print("Thread with start number " + startNumber + " continues to more puts\n"); + // Now add more puts for CF2, so that we only flush CF2 to disk + for (int i = startNumber + 3000; i < startNumber + 5000; i++) { + stressedRegion.put(createPut(2, i)); + } + // And add more puts for CF1 + for (int i = startNumber + 5000; i < startNumber + 7000; i++) { + stressedRegion.put(createPut(1, i)); + } + System.out.print("Thread with start number " + startNumber + " flushes\n"); + // flush (IN MEMORY) one of the stores (each thread flushes different store) + // and wait till the flush and the following action are done + if (startNumber == 0) { + ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) + .flushInMemory(); + while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + } + if (startNumber == 10000) { + ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore).flushInMemory(); + while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + } + if (startNumber == 20000) { + ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore).flushInMemory(); + while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) + .isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + } + System.out.print("Thread with start number " + startNumber + " finishes\n"); + } catch (IOException e) { + assert false; + } + } + } + private WAL getWAL(Region region) { return ((HRegion)region).getWAL(); }