Repository: hbase Updated Branches: refs/heads/master cb02be38a -> 32c21f459
http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 1191f30..8cf0a7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -305,10 +305,6 @@ public class SegmentScanner implements KeyValueScanner { // do nothing } - protected Segment getSegment(){ - return segment; - } - //debug method @Override public String toString() { @@ -320,6 +316,10 @@ public class SegmentScanner implements KeyValueScanner { /********************* Private Methods **********************/ + private Segment getSegment(){ + return segment; + } + /** * Private internal method for iterating over the segment, * skipping the cells with irrelevant MVCC http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java index 9d7a723..2e8bead 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -38,8 +38,7 @@ public class VersionedSegmentsList { private final LinkedList<ImmutableSegment> storeSegments; private final long version; - public VersionedSegmentsList( - LinkedList<ImmutableSegment> storeSegments, long version) { + public VersionedSegmentsList(LinkedList<ImmutableSegment> storeSegments, long version) { this.storeSegments = storeSegments; this.version = version; } @@ -51,4 +50,16 @@ public class VersionedSegmentsList { public long getVersion() { return version; } + + public int getNumOfCells() { + int totalCells = 0; + for (ImmutableSegment s : storeSegments) { + totalCells += s.getCellsCount(); + } + return totalCells; + } + + public int getNumOfSegments() { + return storeSegments.size(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 09e2271..e7d6661 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -28,11 +28,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.io.hfile.LruCachedBlock; -import org.apache.hadoop.hbase.regionserver.CellSet; -import org.apache.hadoop.hbase.regionserver.DefaultMemStore; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ClassSize; @@ -174,6 +170,15 @@ public class TestHeapSize { assertEquals(expected, actual); } + // CellArrayMap + cl = CellArrayMap.class; + expected = ClassSize.estimateBase(cl, false); + actual = ClassSize.CELL_ARRAY_MAP; + if(expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + // ReentrantReadWriteLock cl = ReentrantReadWriteLock.class; expected = ClassSize.estimateBase(cl, false); @@ -240,7 +245,7 @@ public class TestHeapSize { // CellSet cl = CellSet.class; expected = ClassSize.estimateBase(cl, false); - actual = ClassSize.CELL_SKIPLIST_SET; + actual = ClassSize.CELL_SET; if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java new file mode 100644 index 0000000..cd5788e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -0,0 +1,143 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.experimental.categories.Category; + +import java.util.Iterator; +import java.util.NavigableMap; +import java.util.SortedSet; +import static org.junit.Assert.assertTrue; + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestCellFlatSet extends TestCase { + + private static final int NUM_OF_CELLS = 4; + + private Cell cells[]; + private CellArrayMap cbOnHeap; + + private final static Configuration conf = new Configuration(); + private HeapMemStoreLAB mslab; + + + protected void setUp() throws Exception { + super.setUp(); + + // create array of Cells to bass to the CellFlatMap under CellSet + final byte[] one = Bytes.toBytes(15); + final byte[] two = Bytes.toBytes(25); + final byte[] three = Bytes.toBytes(35); + final byte[] four = Bytes.toBytes(45); + + final byte[] f = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); + final byte[] v = Bytes.toBytes(4); + + final KeyValue kv1 = new KeyValue(one, f, q, 10, v); + final KeyValue kv2 = new KeyValue(two, f, q, 20, v); + final KeyValue kv3 = new KeyValue(three, f, q, 30, v); + final KeyValue kv4 = new KeyValue(four, f, q, 40, v); + + cells = new Cell[] {kv1,kv2,kv3,kv4}; + cbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,cells,0,NUM_OF_CELLS,false); + + conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true); + conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); + MemStoreChunkPool.chunkPoolDisabled = false; + mslab = new HeapMemStoreLAB(conf); + } + + /* Create and test CellSet based on CellArrayMap */ + public void testCellBlocksOnHeap() throws Exception { + CellSet cs = new CellSet(cbOnHeap); + testCellBlocks(cs); + testIterators(cs); + } + + /* Generic basic test for immutable CellSet */ + private void testCellBlocks(CellSet cs) throws Exception { + final byte[] oneAndHalf = Bytes.toBytes(20); + final byte[] f = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); + final byte[] v = Bytes.toBytes(4); + final KeyValue outerCell = new KeyValue(oneAndHalf, f, q, 10, v); + + assertEquals(NUM_OF_CELLS, cs.size()); // check size + assertFalse(cs.contains(outerCell)); // check outer cell + + assertTrue(cs.contains(cells[0])); // check existence of the first + Cell first = cs.first(); + assertTrue(cells[0].equals(first)); + + assertTrue(cs.contains(cells[NUM_OF_CELLS - 1])); // check last + Cell last = cs.last(); + assertTrue(cells[NUM_OF_CELLS - 1].equals(last)); + + SortedSet<Cell> tail = cs.tailSet(cells[1]); // check tail abd head sizes + assertEquals(NUM_OF_CELLS - 1, tail.size()); + SortedSet<Cell> head = cs.headSet(cells[1]); + assertEquals(1, head.size()); + + SortedSet<Cell> tailOuter = cs.tailSet(outerCell); // check tail starting from outer cell + assertEquals(NUM_OF_CELLS - 1, tailOuter.size()); + + Cell tailFirst = tail.first(); + assertTrue(cells[1].equals(tailFirst)); + Cell tailLast = tail.last(); + assertTrue(cells[NUM_OF_CELLS - 1].equals(tailLast)); + + Cell headFirst = head.first(); + assertTrue(cells[0].equals(headFirst)); + Cell headLast = head.last(); + assertTrue(cells[0].equals(headLast)); + } + + /* Generic iterators test for immutable CellSet */ + private void testIterators(CellSet cs) throws Exception { + + // Assert that we have NUM_OF_CELLS values and that they are in order + int count = 0; + for (Cell kv: cs) { + assertEquals("\n\n-------------------------------------------------------------------\n" + + "Comparing iteration number " + (count + 1) + " the returned cell: " + kv + + ", the first Cell in the CellBlocksMap: " + cells[count] + + ", and the same transformed to String: " + cells[count].toString() + + "\n-------------------------------------------------------------------\n", + cells[count], kv); + count++; + } + assertEquals(NUM_OF_CELLS, count); + + // Test descending iterator + count = 0; + for (Iterator<Cell> i = cs.descendingIterator(); i.hasNext();) { + Cell kv = i.next(); + assertEquals(cells[NUM_OF_CELLS - (count + 1)], kv); + count++; + } + assertEquals(NUM_OF_CELLS, count); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index c5aae00..db0205e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -58,15 +58,15 @@ import static org.junit.Assert.assertTrue; public class TestCompactingMemStore extends TestDefaultMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class); - private static MemStoreChunkPool chunkPool; - private HRegion region; - private RegionServicesForStores regionServicesForStores; - private HStore store; + protected static MemStoreChunkPool chunkPool; + protected HRegion region; + protected RegionServicesForStores regionServicesForStores; + protected HStore store; ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// - private static byte[] makeQualifier(final int i1, final int i2) { + protected static byte[] makeQualifier(final int i1, final int i2) { return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); } @@ -79,6 +79,12 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Override @Before public void setUp() throws Exception { + compactingSetUp(); + this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparator.COMPARATOR, + store, regionServicesForStores); + } + + protected void compactingSetUp() throws Exception { super.internalSetUp(); Configuration conf = new Configuration(); conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true); @@ -89,13 +95,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { this.region = hbaseUtility.createTestRegion("foobar", hcd); this.regionServicesForStores = region.getRegionServicesForStores(); this.store = new HStore(region, hcd, conf); - this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparator.COMPARATOR, - store, regionServicesForStores); + chunkPool = MemStoreChunkPool.getPool(conf); assertTrue(chunkPool != null); } - /** * A simple test which verifies the 3 possible states when scanning across snapshot. * @@ -597,7 +601,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(10); } assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot @@ -624,11 +628,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { Threads.sleep(1000); } + int counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3, counter); assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys2); - assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact @@ -636,7 +645,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(10); } assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot @@ -667,7 +676,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(10); } assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys2); @@ -675,16 +684,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { region.getMemstoreSize() + ", Memstore Total Size: " + regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n"; - assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); ((CompactingMemStore)memstore).disableCompaction(); size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys3); - assertEquals(1128, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize()); ((CompactingMemStore)memstore).enableCompaction(); size = memstore.getFlushableSize(); @@ -693,7 +702,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(10); } assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java new file mode 100644 index 0000000..1933343 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -0,0 +1,361 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.util.ArrayList; +import java.util.List; + + + +/** + * compacted memstore test case + */ +@Category({RegionServerTests.class, MediumTests.class}) +public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore { + + private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class); + //private static MemStoreChunkPool chunkPool; + //private HRegion region; + //private RegionServicesForStores regionServicesForStores; + //private HStore store; + + ////////////////////////////////////////////////////////////////////////////// + // Helpers + ////////////////////////////////////////////////////////////////////////////// + + @Override public void tearDown() throws Exception { + chunkPool.clearChunks(); + } + + @Override public void setUp() throws Exception { + compactingSetUp(); + Configuration conf = HBaseConfiguration.create(); + + conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to CellArrayMap + + this.memstore = + new CompactingMemStore(conf, CellComparator.COMPARATOR, store, + regionServicesForStores); + } + + ////////////////////////////////////////////////////////////////////////////// + // Compaction tests + ////////////////////////////////////////////////////////////////////////////// + public void testCompaction1Bucket() throws IOException { + int counter = 0; + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 + + // test 1 bucket + addRowsByKeys(memstore, keys1); + assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); + + assertEquals(4, memstore.getActive().getCellsCount()); + long size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3, counter); + size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.addAndGetGlobalMemstoreSize(-size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(3, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + + public void testCompaction2Buckets() throws IOException { + + String[] keys1 = { "A", "A", "B", "C" }; + String[] keys2 = { "A", "B", "D" }; + + addRowsByKeys(memstore, keys1); + assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); + long size = memstore.getFlushableSize(); + +// assertTrue( +// "\n\n<<< This is the active size with 4 keys - " + memstore.getActive().getSize() +// + ". This is the memstore flushable size - " + size + "\n",false); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(1000); + } + int counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3,counter); + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); + + addRowsByKeys(memstore, keys2); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); + + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + int i = 0; + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + if (i > 10000000) { + ((CompactingMemStore) memstore).debug(); + assertTrue("\n\n<<< Infinite loop! :( \n", false); + } + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(4,counter); + assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize()); + + size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.addAndGetGlobalMemstoreSize(-size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(4, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + + public void testCompaction3Buckets() throws IOException { + + String[] keys1 = { "A", "A", "B", "C" }; + String[] keys2 = { "A", "B", "D" }; + String[] keys3 = { "D", "B", "B" }; + + addRowsByKeys(memstore, keys1); + assertEquals(496, region.getMemstoreSize()); + + long size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + + String tstStr = "\n\nFlushable size after first flush in memory:" + size + ". Is MemmStore in compaction?:" + + ((CompactingMemStore) memstore).isMemStoreFlushingInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); + + addRowsByKeys(memstore, keys2); + + tstStr += " After adding second part of the keys. Memstore size: " + + region.getMemstoreSize() + ", Memstore Total Size: " + + regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n"; + + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); + + ((CompactingMemStore) memstore).disableCompaction(); + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); + + addRowsByKeys(memstore, keys3); + assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize()); + + ((CompactingMemStore) memstore).enableCompaction(); + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize()); + + size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.addAndGetGlobalMemstoreSize(-size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(4, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); + + memstore.clearSnapshot(snapshot.getId()); + + } + + ////////////////////////////////////////////////////////////////////////////// + // Flattening tests + ////////////////////////////////////////////////////////////////////////////// + @Test + public void testFlattening() throws IOException { + + String[] keys1 = { "A", "A", "B", "C", "F", "H"}; + String[] keys2 = { "A", "B", "D", "G", "I", "J"}; + String[] keys3 = { "D", "B", "B", "E" }; + + // set flattening to true + memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", true); + + addRowsByKeys(memstore, keys1); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact + + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys2); // also should only flatten + + ((CompactingMemStore) memstore).disableCompaction(); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys3); + + ((CompactingMemStore) memstore).enableCompaction(); + + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + int counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(10,counter); + + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + ImmutableSegment s = memstore.getSnapshot(); + memstore.clearSnapshot(snapshot.getId()); + } + + @Test + public void testCountOfCellsAfterFlatteningByScan() throws IOException { + String[] keys1 = { "A", "B", "C" }; // A, B, C + addRowsByKeysWith50Cols(memstore, keys1); + // this should only flatten as there are no duplicates + ((CompactingMemStore) memstore).flushInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE); + MemStoreScanner scanner = new MemStoreScanner(CellComparator.COMPARATOR, scanners); + int count = 0; + while (scanner.next() != null) { + count++; + } + assertEquals("the count should be ", count, 150); + scanner.close(); + } + + @Test + public void testCountOfCellsAfterFlatteningByIterator() throws IOException { + String[] keys1 = { "A", "B", "C" }; // A, B, C + addRowsByKeysWith50Cols(memstore, keys1); + // this should only flatten as there are no duplicates + ((CompactingMemStore) memstore).flushInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + // Just doing the cnt operation here + MemStoreCompactorIterator itr = new MemStoreCompactorIterator( + ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), + CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore()); + int cnt = 0; + try { + while (itr.next() != null) { + cnt++; + } + } finally { + itr.close(); + } + assertEquals("the count should be ", cnt, 150); + } + + + private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) { + byte[] fam = Bytes.toBytes("testfamily"); + for (int i = 0; i < keys.length; i++) { + long timestamp = System.currentTimeMillis(); + Threads.sleep(1); // to make sure each kv gets a different ts + byte[] row = Bytes.toBytes(keys[i]); + for(int j =0 ;j < 50; j++) { + byte[] qf = Bytes.toBytes("testqualifier"+j); + byte[] val = Bytes.toBytes(keys[i] + j); + KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); + hmc.add(kv); + } + } + } + + private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) { + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf = Bytes.toBytes("testqualifier"); + long size = hmc.getActive().getSize();// + for (int i = 0; i < keys.length; i++) { + long timestamp = System.currentTimeMillis(); + Threads.sleep(1); // to make sure each kv gets a different ts + byte[] row = Bytes.toBytes(keys[i]); + byte[] val = Bytes.toBytes(keys[i] + i); + KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); + hmc.add(kv); + LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); + } + regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().getSize() - size);// + } + + private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { + long t = 1234; + + @Override public long currentTime() { + return t; + } + + public void setCurrentTimeMillis(long t) { + this.t = t; + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 765c9cf..2042f52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -5109,7 +5109,7 @@ public class TestHRegion { * * @throws IOException */ - private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) + protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) throws IOException { byte[][] families = { fs }; Scan scan = new Scan(); @@ -5172,7 +5172,7 @@ public class TestHRegion { } } - private Configuration initSplit() { + protected Configuration initSplit() { // Always compact if there is more than one store file. CONF.setInt("hbase.hstore.compactionThreshold", 2); http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java index be604af..d66899b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.TreeMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; @@ -30,11 +33,19 @@ import org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence; import org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.junit.ClassRule; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; +import static org.junit.Assert.assertNotNull; + /** * A test similar to TestHRegion, but with in-memory flush families. * Also checks wal truncation after in-memory compaction. @@ -65,5 +76,88 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion{ isReadOnly, durability, wal, inMemory, families); } + /** + * Splits twice and verifies getting from each of the split regions. + * + * @throws Exception + */ + @Override + public void testBasicSplit() throws Exception { + byte[][] families = { fam1, fam2, fam3 }; + + Configuration hc = initSplit(); + // Setting up region + String method = this.getName(); + this.region = initHRegion(tableName, method, hc, families); + + try { + LOG.info("" + HBaseTestCase.addContent(region, fam3)); + region.flush(true); + region.compactStores(); + byte[] splitRow = region.checkSplit(); + assertNotNull(splitRow); + LOG.info("SplitRow: " + Bytes.toString(splitRow)); + HRegion[] regions = splitRegion(region, splitRow); + try { + // Need to open the regions. + // TODO: Add an 'open' to HRegion... don't do open by constructing + // instance. + for (int i = 0; i < regions.length; i++) { + regions[i] = HRegion.openHRegion(regions[i], null); + } + // Assert can get rows out of new regions. Should be able to get first + // row from first region and the midkey from second region. + assertGet(regions[0], fam3, Bytes.toBytes(START_KEY)); + assertGet(regions[1], fam3, splitRow); + // Test I can get scanner and that it starts at right place. + assertScan(regions[0], fam3, Bytes.toBytes(START_KEY)); + assertScan(regions[1], fam3, splitRow); + // Now prove can't split regions that have references. + for (int i = 0; i < regions.length; i++) { + // Add so much data to this region, we create a store file that is > + // than one of our unsplitable references. it will. + for (int j = 0; j < 2; j++) { + HBaseTestCase.addContent(regions[i], fam3); + } + HBaseTestCase.addContent(regions[i], fam2); + HBaseTestCase.addContent(regions[i], fam1); + regions[i].flush(true); + } + + byte[][] midkeys = new byte[regions.length][]; + // To make regions splitable force compaction. + for (int i = 0; i < regions.length; i++) { + regions[i].compactStores(); + midkeys[i] = regions[i].checkSplit(); + } + + TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>(); + // Split these two daughter regions so then I'll have 4 regions. Will + // split because added data above. + for (int i = 0; i < regions.length; i++) { + HRegion[] rs = null; + if (midkeys[i] != null) { + rs = splitRegion(regions[i], midkeys[i]); + for (int j = 0; j < rs.length; j++) { + sortedMap.put(Bytes.toString(rs[j].getRegionInfo().getRegionName()), + HRegion.openHRegion(rs[j], null)); + } + } + } + LOG.info("Made 4 regions"); + } finally { + for (int i = 0; i < regions.length; i++) { + try { + regions[i].close(); + } catch (IOException e) { + // Ignore. + } + } + } + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 2acfd12..a6c7912 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -135,12 +135,264 @@ public class TestWalAndCompactingMemStoreFlush { // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); - conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class - .getName()); - conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * - 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); + + // Intialize the region + Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); + + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); // compacted memstore, all the keys are unique + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + // compacted memstore, subject for compaction due to duplications + region.put(createDoublePut(3, i)); + } + } + } + + // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk + for (int i = 100; i < 2000; i++) { + region.put(createPut(2, i)); + } + + long totalMemstoreSize = region.getMemstoreSize(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + String s = "\n\n----------------------------------\n" + + "Upon initial insert and before any flush, size of CF1 is:" + + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:" + + region.getStore(FAMILY1).getMemStore().isSloppy() + ". Size of CF2 is:" + + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" + + region.getStore(FAMILY2).getMemStore().isSloppy() + ". Size of CF3 is:" + + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:" + + region.getStore(FAMILY3).getMemStore().isSloppy() + "\n"; + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); + + // Some other sanity checks. + assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); + assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); + assertTrue(cf1MemstoreSizePhaseI > 0); + assertTrue(cf2MemstoreSizePhaseI > 0); + assertTrue(cf3MemstoreSizePhaseI > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + String msg = "totalMemstoreSize="+totalMemstoreSize + + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + + " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + + + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; + assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, + cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); + + // Flush!!!!!!!!!!!!!!!!!!!!!! + // We have big compacting memstore CF1 and two small memstores: + // CF2 (not compacted) and CF3 (compacting) + // All together they are above the flush size lower bound. + // Since CF1 and CF3 should be flushed to memory (not to disk), + // CF2 is going to be flushed to disk. + // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted + ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); + ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); + region.flush(false); + + // CF3 should be compacted so wait here to be sure the compaction is done + while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); + + // Recalculate everything + long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); + + long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + + s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD + + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD + + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" + CompactingMemStore + .DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" + + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII + + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; + + // CF1 was flushed to memory, but there is nothing to compact, and CF! was flattened + assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI); + + // CF2 should become empty + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); + + // verify that CF3 was flushed to memory and was compacted (this is approximation check) + assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD + + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM > + cf3MemstoreSizePhaseII); + + // CF3 was compacted and flattened! + assertTrue("\n<<< Size of CF3 in phase I - " + cf3MemstoreSizePhaseI + + ", size of CF3 in phase II - " + cf3MemstoreSizePhaseII + "\n", + cf3MemstoreSizePhaseI / 2 > cf3MemstoreSizePhaseII); + + + // Now the smallest LSN in the region should be the same as the smallest + // LSN in the memstore of CF1. + assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); + + // Now add more puts for CF1, so that we also flush CF1 to disk instead of + // memory in next flush + for (int i = 1200; i < 3000; i++) { + region.put(createPut(1, i)); + } + + s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII + + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " + + "the smallest sequence in CF2:" + + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n"; + + // How much does the CF1 memstore occupy? Will be used later. + long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); + long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); + + s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII + + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ; + + + // Flush!!!!!!!!!!!!!!!!!!!!!! + // Flush again, CF1 is flushed to disk + // CF2 is flushed to disk, because it is not in-memory compacted memstore + // CF3 is flushed empty to memory (actually nothing happens to CF3) + region.flush(false); + + // Recalculate everything + long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); + + long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); + + s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" + + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + + "\n"; + + s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV + + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + + "the smallest sequence in CF2:" + + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + + "\n"; + + // CF1's pipeline component (inserted before first flush) should be flushed to disk + // CF2 should be flushed to disk + assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); + + // CF3 shouldn't have been touched. + assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); + + // the smallest LSN of CF3 shouldn't change + assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); + + // CF3 should be bottleneck for WAL + assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); + + // Flush!!!!!!!!!!!!!!!!!!!!!! + // Trying to clean the existing memstores, CF2 all flushed to disk. The single + // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. + // Note that active set of CF3 is empty + // But active set of CF1 is not yet empty + region.flush(true); + + // Recalculate everything + long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); + long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + assertTrue(DefaultMemStore.DEEP_OVERHEAD < cf1MemstoreSizePhaseV); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); + + region.flush(true); // flush once again in order to be sure that everything is empty + assertEquals(DefaultMemStore.DEEP_OVERHEAD, region.getStore(FAMILY1).getMemStoreSize()); + + // What happens when we hit the memstore limit, but we are not able to find + // any Column Family above the threshold? + // In that case, we should flush all the CFs. + + // The memstore limit is 200*1024 and the column family flush threshold is + // around 50*1024. We try to just hit the memstore limit with each CF's + // memstore being below the CF flush threshold. + for (int i = 1; i <= 300; i++) { + region.put(createPut(1, i)); + region.put(createPut(2, i)); + region.put(createPut(3, i)); + region.put(createPut(4, i)); + region.put(createPut(5, i)); + } + + region.flush(false); + + s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " + + smallestSeqInRegionCurrentMemstorePhaseV + + ". After additional inserts and last flush, the entire region size is:" + region + .getMemstoreSize() + + "\n----------------------------------\n"; + + // Since we won't find any CF above the threshold, and hence no specific + // store to flush, we should flush all the memstores + // Also compacted memstores are flushed to disk. + assertEquals(0, region.getMemstoreSize()); + System.out.println(s); + HBaseTestingUtility.closeRegionAndWAL(region); + } + + @Test(timeout = 180000) + public void testSelectiveFlushWhenEnabledNoFlattening() throws IOException { + + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore segment flattening to false and compact to skip-list + conf.setBoolean("hbase.hregion.compacting.memstore.flatten", false); + conf.setInt("hbase.hregion.compacting.memstore.type",1); + // Intialize the region Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); @@ -201,7 +453,8 @@ public class TestWalAndCompactingMemStoreFlush { // memstores of CF1, CF2 and CF3. String msg = "totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM + + " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; @@ -238,8 +491,8 @@ public class TestWalAndCompactingMemStoreFlush { s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD - + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_ITEM is:" + CompactingMemStore - .DEEP_OVERHEAD_PER_PIPELINE_ITEM + + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" + CompactingMemStore + .DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; @@ -247,14 +500,14 @@ public class TestWalAndCompactingMemStoreFlush { // CF1 was flushed to memory, but there is nothing to compact, should // remain the same size plus renewed empty skip-list assertEquals(s, cf1MemstoreSizePhaseII, - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM); // CF2 should become empty assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); // verify that CF3 was flushed to memory and was compacted (this is approximation check) assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD + - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM > + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM > cf3MemstoreSizePhaseII); assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII); @@ -322,7 +575,7 @@ public class TestWalAndCompactingMemStoreFlush { assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); // CF3 should be bottleneck for WAL - assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); + assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); // Flush!!!!!!!!!!!!!!!!!!!!!! // Clearing the existing memstores, CF2 all flushed to disk. The single @@ -419,7 +672,8 @@ public class TestWalAndCompactingMemStoreFlush { // memstores of CF1, CF2 and CF3. String msg = "totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM + + " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-shell/src/main/ruby/hbase/admin.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index e4f52e9..67dde53 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -815,7 +815,8 @@ module Hbase family.setScope(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE) family.setCacheDataOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE) family.setInMemory(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY) - family.setCompacted(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION) + family.setInMemoryCompaction( + JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION) family.setTimeToLive(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::TTL)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL) family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING) family.setBlocksize(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)