Repository: hbase
Updated Branches:
  refs/heads/master cb02be38a -> 32c21f459


http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 1191f30..8cf0a7c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -305,10 +305,6 @@ public class SegmentScanner implements KeyValueScanner {
     // do nothing
   }
 
-  protected Segment getSegment(){
-    return segment;
-  }
-
   //debug method
   @Override
   public String toString() {
@@ -320,6 +316,10 @@ public class SegmentScanner implements KeyValueScanner {
 
   /********************* Private Methods **********************/
 
+  private Segment getSegment(){
+    return segment;
+  }
+
   /**
    * Private internal method for iterating over the segment,
    * skipping the cells with irrelevant MVCC

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
index 9d7a723..2e8bead 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
@@ -38,8 +38,7 @@ public class VersionedSegmentsList {
   private final LinkedList<ImmutableSegment> storeSegments;
   private final long version;
 
-  public VersionedSegmentsList(
-          LinkedList<ImmutableSegment> storeSegments, long version) {
+  public VersionedSegmentsList(LinkedList<ImmutableSegment> storeSegments, 
long version) {
     this.storeSegments = storeSegments;
     this.version = version;
   }
@@ -51,4 +50,16 @@ public class VersionedSegmentsList {
   public long getVersion() {
     return version;
   }
+
+  public int getNumOfCells() {
+    int totalCells = 0;
+    for (ImmutableSegment s : storeSegments) {
+      totalCells += s.getCellsCount();
+    }
+    return totalCells;
+  }
+
+  public int getNumOfSegments() {
+    return storeSegments.size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 09e2271..e7d6661 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -28,11 +28,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
-import org.apache.hadoop.hbase.regionserver.CellSet;
-import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.*;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -174,6 +170,15 @@ public class TestHeapSize  {
       assertEquals(expected, actual);
     }
 
+    // CellArrayMap
+    cl = CellArrayMap.class;
+    expected = ClassSize.estimateBase(cl, false);
+    actual = ClassSize.CELL_ARRAY_MAP;
+    if(expected != actual) {
+      ClassSize.estimateBase(cl, true);
+      assertEquals(expected, actual);
+    }
+
     // ReentrantReadWriteLock
     cl = ReentrantReadWriteLock.class;
     expected = ClassSize.estimateBase(cl, false);
@@ -240,7 +245,7 @@ public class TestHeapSize  {
     // CellSet
     cl = CellSet.class;
     expected = ClassSize.estimateBase(cl, false);
-    actual = ClassSize.CELL_SKIPLIST_SET;
+    actual = ClassSize.CELL_SET;
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
new file mode 100644
index 0000000..cd5788e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.experimental.categories.Category;
+
+import java.util.Iterator;
+import java.util.NavigableMap;
+import java.util.SortedSet;
+import static org.junit.Assert.assertTrue;
+
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestCellFlatSet extends TestCase {
+
+  private static final int NUM_OF_CELLS = 4;
+
+  private Cell cells[];
+  private CellArrayMap cbOnHeap;
+
+  private final static Configuration conf = new Configuration();
+  private HeapMemStoreLAB mslab;
+
+
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    // create array of Cells to bass to the CellFlatMap under CellSet
+    final byte[] one = Bytes.toBytes(15);
+    final byte[] two = Bytes.toBytes(25);
+    final byte[] three = Bytes.toBytes(35);
+    final byte[] four = Bytes.toBytes(45);
+
+    final byte[] f = Bytes.toBytes("f");
+    final byte[] q = Bytes.toBytes("q");
+    final byte[] v = Bytes.toBytes(4);
+
+    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
+    final KeyValue kv2 = new KeyValue(two, f, q, 20, v);
+    final KeyValue kv3 = new KeyValue(three, f, q, 30, v);
+    final KeyValue kv4 = new KeyValue(four, f, q, 40, v);
+
+    cells = new Cell[] {kv1,kv2,kv3,kv4};
+    cbOnHeap = new 
CellArrayMap(CellComparator.COMPARATOR,cells,0,NUM_OF_CELLS,false);
+
+    conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
+    conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
+    MemStoreChunkPool.chunkPoolDisabled = false;
+    mslab = new HeapMemStoreLAB(conf);
+  }
+
+  /* Create and test CellSet based on CellArrayMap */
+  public void testCellBlocksOnHeap() throws Exception {
+    CellSet cs = new CellSet(cbOnHeap);
+    testCellBlocks(cs);
+    testIterators(cs);
+  }
+
+  /* Generic basic test for immutable CellSet */
+  private void testCellBlocks(CellSet cs) throws Exception {
+    final byte[] oneAndHalf = Bytes.toBytes(20);
+    final byte[] f = Bytes.toBytes("f");
+    final byte[] q = Bytes.toBytes("q");
+    final byte[] v = Bytes.toBytes(4);
+    final KeyValue outerCell = new KeyValue(oneAndHalf, f, q, 10, v);
+
+    assertEquals(NUM_OF_CELLS, cs.size());          // check size
+    assertFalse(cs.contains(outerCell));            // check outer cell
+
+    assertTrue(cs.contains(cells[0]));              // check existence of the 
first
+    Cell first = cs.first();
+    assertTrue(cells[0].equals(first));
+
+    assertTrue(cs.contains(cells[NUM_OF_CELLS - 1]));  // check last
+    Cell last = cs.last();
+    assertTrue(cells[NUM_OF_CELLS - 1].equals(last));
+
+    SortedSet<Cell> tail = cs.tailSet(cells[1]);    // check tail abd head 
sizes
+    assertEquals(NUM_OF_CELLS - 1, tail.size());
+    SortedSet<Cell> head = cs.headSet(cells[1]);
+    assertEquals(1, head.size());
+
+    SortedSet<Cell> tailOuter = cs.tailSet(outerCell);  // check tail starting 
from outer cell
+    assertEquals(NUM_OF_CELLS - 1, tailOuter.size());
+
+    Cell tailFirst = tail.first();
+    assertTrue(cells[1].equals(tailFirst));
+    Cell tailLast = tail.last();
+    assertTrue(cells[NUM_OF_CELLS - 1].equals(tailLast));
+
+    Cell headFirst = head.first();
+    assertTrue(cells[0].equals(headFirst));
+    Cell headLast = head.last();
+    assertTrue(cells[0].equals(headLast));
+  }
+
+  /* Generic iterators test for immutable CellSet */
+  private void testIterators(CellSet cs) throws Exception {
+
+    // Assert that we have NUM_OF_CELLS values and that they are in order
+    int count = 0;
+    for (Cell kv: cs) {
+      
assertEquals("\n\n-------------------------------------------------------------------\n"
+              + "Comparing iteration number " + (count + 1) + " the returned 
cell: " + kv
+              + ", the first Cell in the CellBlocksMap: " + cells[count]
+              + ", and the same transformed to String: " + 
cells[count].toString()
+              + 
"\n-------------------------------------------------------------------\n",
+              cells[count], kv);
+      count++;
+    }
+    assertEquals(NUM_OF_CELLS, count);
+
+    // Test descending iterator
+    count = 0;
+    for (Iterator<Cell> i = cs.descendingIterator(); i.hasNext();) {
+      Cell kv = i.next();
+      assertEquals(cells[NUM_OF_CELLS - (count + 1)], kv);
+      count++;
+    }
+    assertEquals(NUM_OF_CELLS, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index c5aae00..db0205e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -58,15 +58,15 @@ import static org.junit.Assert.assertTrue;
 public class TestCompactingMemStore extends TestDefaultMemStore {
 
   private static final Log LOG = 
LogFactory.getLog(TestCompactingMemStore.class);
-  private static MemStoreChunkPool chunkPool;
-  private HRegion region;
-  private RegionServicesForStores regionServicesForStores;
-  private HStore store;
+  protected static MemStoreChunkPool chunkPool;
+  protected HRegion region;
+  protected RegionServicesForStores regionServicesForStores;
+  protected HStore store;
 
   
//////////////////////////////////////////////////////////////////////////////
   // Helpers
   
//////////////////////////////////////////////////////////////////////////////
-  private static byte[] makeQualifier(final int i1, final int i2) {
+  protected static byte[] makeQualifier(final int i1, final int i2) {
     return Bytes.toBytes(Integer.toString(i1) + ";" +
         Integer.toString(i2));
   }
@@ -79,6 +79,12 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
   @Override
   @Before
   public void setUp() throws Exception {
+    compactingSetUp();
+    this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 
CellComparator.COMPARATOR,
+        store, regionServicesForStores);
+  }
+
+  protected void compactingSetUp() throws Exception {
     super.internalSetUp();
     Configuration conf = new Configuration();
     conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
@@ -89,13 +95,11 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
     this.region = hbaseUtility.createTestRegion("foobar", hcd);
     this.regionServicesForStores = region.getRegionServicesForStores();
     this.store = new HStore(region, hcd, conf);
-    this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 
CellComparator.COMPARATOR,
-        store, regionServicesForStores);
+
     chunkPool = MemStoreChunkPool.getPool(conf);
     assertTrue(chunkPool != null);
   }
 
-
   /**
    * A simple test which verifies the 3 possible states when scanning across 
snapshot.
    *
@@ -597,7 +601,7 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
@@ -624,11 +628,16 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
     while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
       Threads.sleep(1000);
     }
+    int counter = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(3, counter);
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     addRowsByKeys(memstore, keys2);
-    assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline 
and compact
@@ -636,7 +645,7 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
@@ -667,7 +676,7 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     addRowsByKeys(memstore, keys2);
 
@@ -675,16 +684,16 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
         region.getMemstoreSize() + ", Memstore Total Size: " +
         regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n";
 
-    assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     ((CompactingMemStore)memstore).disableCompaction();
     size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline 
without compaction
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     addRowsByKeys(memstore, keys3);
-    assertEquals(1128, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     ((CompactingMemStore)memstore).enableCompaction();
     size = memstore.getFlushableSize();
@@ -693,7 +702,7 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
new file mode 100644
index 0000000..1933343
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.ArrayList;
+import java.util.List;
+
+
+
+/**
+ * compacted memstore test case
+ */
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestCompactingToCellArrayMapMemStore extends 
TestCompactingMemStore {
+
+  private static final Log LOG = 
LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
+  //private static MemStoreChunkPool chunkPool;
+  //private HRegion region;
+  //private RegionServicesForStores regionServicesForStores;
+  //private HStore store;
+
+  
//////////////////////////////////////////////////////////////////////////////
+  // Helpers
+  
//////////////////////////////////////////////////////////////////////////////
+
+  @Override public void tearDown() throws Exception {
+    chunkPool.clearChunks();
+  }
+
+  @Override public void setUp() throws Exception {
+    compactingSetUp();
+    Configuration conf = HBaseConfiguration.create();
+
+    conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to 
CellArrayMap
+
+    this.memstore =
+        new CompactingMemStore(conf, CellComparator.COMPARATOR, store,
+            regionServicesForStores);
+  }
+
+  
//////////////////////////////////////////////////////////////////////////////
+  // Compaction tests
+  
//////////////////////////////////////////////////////////////////////////////
+  public void testCompaction1Bucket() throws IOException {
+    int counter = 0;
+    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
+
+    // test 1 bucket
+    addRowsByKeys(memstore, keys1);
+    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    assertEquals(4, memstore.getActive().getCellsCount());
+    long size = memstore.getFlushableSize();
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
and compact
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(3, counter);
+    size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(3, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  public void testCompaction2Buckets() throws IOException {
+
+    String[] keys1 = { "A", "A", "B", "C" };
+    String[] keys2 = { "A", "B", "D" };
+
+    addRowsByKeys(memstore, keys1);
+    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    long size = memstore.getFlushableSize();
+
+//    assertTrue(
+//        "\n\n<<< This is the active size with 4 keys - " + 
memstore.getActive().getSize()
+//            + ". This is the memstore flushable size - " + size + 
"\n",false);
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
and compact
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(1000);
+    }
+    int counter = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(3,counter);
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    addRowsByKeys(memstore, keys2);
+    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    size = memstore.getFlushableSize();
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
and compact
+    int i = 0;
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+      if (i > 10000000) {
+        ((CompactingMemStore) memstore).debug();
+        assertTrue("\n\n<<< Infinite loop! :( \n", false);
+      }
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    counter = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(4,counter);
+    assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(4, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  public void testCompaction3Buckets() throws IOException {
+
+    String[] keys1 = { "A", "A", "B", "C" };
+    String[] keys2 = { "A", "B", "D" };
+    String[] keys3 = { "D", "B", "B" };
+
+    addRowsByKeys(memstore, keys1);
+    assertEquals(496, region.getMemstoreSize());
+
+    long size = memstore.getFlushableSize();
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
and compact
+
+    String tstStr = "\n\nFlushable size after first flush in memory:" + size + 
". Is MemmStore in compaction?:"
+        + ((CompactingMemStore) memstore).isMemStoreFlushingInMemory();
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    addRowsByKeys(memstore, keys2);
+
+    tstStr += " After adding second part of the keys. Memstore size: " +
+        region.getMemstoreSize() + ", Memstore Total Size: " +
+        regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n";
+
+    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    ((CompactingMemStore) memstore).disableCompaction();
+    size = memstore.getFlushableSize();
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
without compaction
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    addRowsByKeys(memstore, keys3);
+    assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    ((CompactingMemStore) memstore).enableCompaction();
+    size = memstore.getFlushableSize();
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
and compact
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(4, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+
+  }
+
+  
//////////////////////////////////////////////////////////////////////////////
+  // Flattening tests
+  
//////////////////////////////////////////////////////////////////////////////
+  @Test
+  public void testFlattening() throws IOException {
+
+    String[] keys1 = { "A", "A", "B", "C", "F", "H"};
+    String[] keys2 = { "A", "B", "D", "G", "I", "J"};
+    String[] keys3 = { "D", "B", "B", "E" };
+
+    // set flattening to true
+    
memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten",
 true);
+
+    addRowsByKeys(memstore, keys1);
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
should not compact
+
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    addRowsByKeys(memstore, keys2); // also should only flatten
+
+    ((CompactingMemStore) memstore).disableCompaction();
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
without flattening
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    addRowsByKeys(memstore, keys3);
+
+    ((CompactingMemStore) memstore).enableCompaction();
+
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
and compact
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    int counter = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(10,counter);
+
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    ImmutableSegment s = memstore.getSnapshot();
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  @Test
+  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
+    String[] keys1 = { "A", "B", "C" }; // A, B, C
+    addRowsByKeysWith50Cols(memstore, keys1);
+    // this should only flatten as there are no duplicates
+    ((CompactingMemStore) memstore).flushInMemory();
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
+    MemStoreScanner scanner = new MemStoreScanner(CellComparator.COMPARATOR, 
scanners);
+    int count = 0;
+    while (scanner.next() != null) {
+      count++;
+    }
+    assertEquals("the count should be ", count, 150);
+    scanner.close();
+  }
+
+  @Test
+  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
+    String[] keys1 = { "A", "B", "C" }; // A, B, C
+    addRowsByKeysWith50Cols(memstore, keys1);
+    // this should only flatten as there are no duplicates
+    ((CompactingMemStore) memstore).flushInMemory();
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    // Just doing the cnt operation here
+    MemStoreCompactorIterator itr = new MemStoreCompactorIterator(
+        ((CompactingMemStore) 
memstore).getImmutableSegments().getStoreSegments(),
+        CellComparator.COMPARATOR, 10, ((CompactingMemStore) 
memstore).getStore());
+    int cnt = 0;
+    try {
+      while (itr.next() != null) {
+        cnt++;
+      }
+    } finally {
+      itr.close();
+    }
+    assertEquals("the count should be ", cnt, 150);
+  }
+
+
+  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
+    byte[] fam = Bytes.toBytes("testfamily");
+    for (int i = 0; i < keys.length; i++) {
+      long timestamp = System.currentTimeMillis();
+      Threads.sleep(1); // to make sure each kv gets a different ts
+      byte[] row = Bytes.toBytes(keys[i]);
+      for(int  j =0 ;j < 50; j++) {
+        byte[] qf = Bytes.toBytes("testqualifier"+j);
+        byte[] val = Bytes.toBytes(keys[i] + j);
+        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
+        hmc.add(kv);
+      }
+    }
+  }
+
+  private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf = Bytes.toBytes("testqualifier");
+    long size = hmc.getActive().getSize();//
+    for (int i = 0; i < keys.length; i++) {
+      long timestamp = System.currentTimeMillis();
+      Threads.sleep(1); // to make sure each kv gets a different ts
+      byte[] row = Bytes.toBytes(keys[i]);
+      byte[] val = Bytes.toBytes(keys[i] + i);
+      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
+      hmc.add(kv);
+      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + 
kv.getTimestamp());
+    }
+    
regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().getSize() - 
size);//
+  }
+
+  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
+    long t = 1234;
+
+    @Override public long currentTime() {
+      return t;
+    }
+
+    public void setCurrentTimeMillis(long t) {
+      this.t = t;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 765c9cf..2042f52 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -5109,7 +5109,7 @@ public class TestHRegion {
    *
    * @throws IOException
    */
-  private void assertScan(final HRegion r, final byte[] fs, final byte[] 
firstValue)
+  protected void assertScan(final HRegion r, final byte[] fs, final byte[] 
firstValue)
       throws IOException {
     byte[][] families = { fs };
     Scan scan = new Scan();
@@ -5172,7 +5172,7 @@ public class TestHRegion {
     }
   }
 
-  private Configuration initSplit() {
+  protected Configuration initSplit() {
     // Always compact if there is more than one store file.
     CONF.setInt("hbase.hstore.compactionThreshold", 2);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
index be604af..d66899b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
@@ -19,10 +19,13 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.TreeMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
@@ -30,11 +33,19 @@ import 
org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence;
 import 
org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.ClassRule;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 
+import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
+import static org.junit.Assert.assertNotNull;
+
 /**
  * A test similar to TestHRegion, but with in-memory flush families.
  * Also checks wal truncation after in-memory compaction.
@@ -65,5 +76,88 @@ public class TestHRegionWithInMemoryFlush extends 
TestHRegion{
         isReadOnly, durability, wal, inMemory, families);
   }
 
+  /**
+   * Splits twice and verifies getting from each of the split regions.
+   *
+   * @throws Exception
+   */
+  @Override
+  public void testBasicSplit() throws Exception {
+    byte[][] families = { fam1, fam2, fam3 };
+
+    Configuration hc = initSplit();
+    // Setting up region
+    String method = this.getName();
+    this.region = initHRegion(tableName, method, hc, families);
+
+    try {
+      LOG.info("" + HBaseTestCase.addContent(region, fam3));
+      region.flush(true);
+      region.compactStores();
+      byte[] splitRow = region.checkSplit();
+      assertNotNull(splitRow);
+      LOG.info("SplitRow: " + Bytes.toString(splitRow));
+      HRegion[] regions = splitRegion(region, splitRow);
+      try {
+        // Need to open the regions.
+        // TODO: Add an 'open' to HRegion... don't do open by constructing
+        // instance.
+        for (int i = 0; i < regions.length; i++) {
+          regions[i] = HRegion.openHRegion(regions[i], null);
+        }
+        // Assert can get rows out of new regions. Should be able to get first
+        // row from first region and the midkey from second region.
+        assertGet(regions[0], fam3, Bytes.toBytes(START_KEY));
+        assertGet(regions[1], fam3, splitRow);
+        // Test I can get scanner and that it starts at right place.
+        assertScan(regions[0], fam3, Bytes.toBytes(START_KEY));
+        assertScan(regions[1], fam3, splitRow);
+        // Now prove can't split regions that have references.
+        for (int i = 0; i < regions.length; i++) {
+          // Add so much data to this region, we create a store file that is >
+          // than one of our unsplitable references. it will.
+          for (int j = 0; j < 2; j++) {
+            HBaseTestCase.addContent(regions[i], fam3);
+          }
+          HBaseTestCase.addContent(regions[i], fam2);
+          HBaseTestCase.addContent(regions[i], fam1);
+          regions[i].flush(true);
+        }
+
+        byte[][] midkeys = new byte[regions.length][];
+        // To make regions splitable force compaction.
+        for (int i = 0; i < regions.length; i++) {
+          regions[i].compactStores();
+          midkeys[i] = regions[i].checkSplit();
+        }
+
+        TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
+        // Split these two daughter regions so then I'll have 4 regions. Will
+        // split because added data above.
+        for (int i = 0; i < regions.length; i++) {
+          HRegion[] rs = null;
+          if (midkeys[i] != null) {
+            rs = splitRegion(regions[i], midkeys[i]);
+            for (int j = 0; j < rs.length; j++) {
+              
sortedMap.put(Bytes.toString(rs[j].getRegionInfo().getRegionName()),
+                  HRegion.openHRegion(rs[j], null));
+            }
+          }
+        }
+        LOG.info("Made 4 regions");
+      } finally {
+        for (int i = 0; i < regions.length; i++) {
+          try {
+            regions[i].close();
+          } catch (IOException e) {
+            // Ignore.
+          }
+        }
+      }
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 2acfd12..a6c7912 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -135,12 +135,264 @@ public class TestWalAndCompactingMemStoreFlush {
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
-    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 
FlushNonSloppyStoresFirstPolicy.class
-        .getName());
-    
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
 200 *
-        1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+        FlushNonSloppyStoresFirstPolicy.class.getName());
+    
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
 200 * 1024);
+    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.25);
+
+    // Intialize the region
+    Region region = initHRegion("testSelectiveFlushWhenEnabled", conf);
+
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));        // compacted memstore, all the keys 
are unique
+
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          // compacted memstore, subject for compaction due to duplications
+          region.put(createDoublePut(3, i));
+        }
+      }
+    }
+
+    // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) 
to disk
+    for (int i = 100; i < 2000; i++) {
+      region.put(createPut(2, i));
+    }
+
+    long totalMemstoreSize = region.getMemstoreSize();
+
+    // Find the smallest LSNs for edits wrt to each CF.
+    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
+
+    // Find the sizes of the memstores of each CF.
+    long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Get the overall smallest LSN in the region's memstores.
+    long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
+        
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    String s = "\n\n----------------------------------\n"
+        + "Upon initial insert and before any flush, size of CF1 is:"
+        + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
+        + region.getStore(FAMILY1).getMemStore().isSloppy() + ". Size of CF2 
is:"
+        + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
+        + region.getStore(FAMILY2).getMemStore().isSloppy() + ". Size of CF3 
is:"
+        + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
+        + region.getStore(FAMILY3).getMemStore().isSloppy() + "\n";
+
+    // The overall smallest LSN in the region's memstores should be the same as
+    // the LSN of the smallest edit in CF1
+    assertEquals(smallestSeqCF1PhaseI, 
smallestSeqInRegionCurrentMemstorePhaseI);
+
+    // Some other sanity checks.
+    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
+    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
+    assertTrue(cf1MemstoreSizePhaseI > 0);
+    assertTrue(cf2MemstoreSizePhaseI > 0);
+    assertTrue(cf3MemstoreSizePhaseI > 0);
+
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3.
+    String msg = "totalMemstoreSize="+totalMemstoreSize +
+        " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
+        " 
DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+        +
+        " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
+        " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
+        " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
+    assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD,
+        cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
+
+    // Flush!!!!!!!!!!!!!!!!!!!!!!
+    // We have big compacting memstore CF1 and two small memstores:
+    // CF2 (not compacted) and CF3 (compacting)
+    // All together they are above the flush size lower bound.
+    // Since CF1 and CF3 should be flushed to memory (not to disk),
+    // CF2 is going to be flushed to disk.
+    // CF1 - nothing to compact (but flattening), CF3 - should be twice 
compacted
+    ((CompactingMemStore) 
region.getStore(FAMILY1).getMemStore()).flushInMemory();
+    ((CompactingMemStore) 
region.getStore(FAMILY3).getMemStore()).flushInMemory();
+    region.flush(false);
+
+    // CF3 should be compacted so wait here to be sure the compaction is done
+    while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
+        .isMemStoreFlushingInMemory())
+      Threads.sleep(10);
+
+    // Recalculate everything
+    long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
+
+    long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
+        
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    // Find the smallest LSNs for edits wrt to each CF.
+    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
+
+    s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD
+        + ", CompactingMemStore DEEP_OVERHEAD is:" + 
CompactingMemStore.DEEP_OVERHEAD
+        + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" 
+ CompactingMemStore
+        .DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+        + "\n----After first flush! CF1 should be flushed to memory, but not 
compacted.---\n"
+        + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + 
cf2MemstoreSizePhaseII
+        + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
+
+    // CF1 was flushed to memory, but there is nothing to compact, and CF! was 
flattened
+    assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI);
+
+    // CF2 should become empty
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII);
+
+    // verify that CF3 was flushed to memory and was compacted (this is 
approximation check)
+    assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD +
+        CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM >
+        cf3MemstoreSizePhaseII);
+
+    // CF3 was compacted and flattened!
+    assertTrue("\n<<< Size of CF3 in phase I - " + cf3MemstoreSizePhaseI
+            + ", size of CF3 in phase II - " + cf3MemstoreSizePhaseII + "\n",
+        cf3MemstoreSizePhaseI / 2 > cf3MemstoreSizePhaseII);
+
+
+    // Now the smallest LSN in the region should be the same as the smallest
+    // LSN in the memstore of CF1.
+    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, 
smallestSeqCF1PhaseI);
+
+    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
+    // memory in next flush
+    for (int i = 1200; i < 3000; i++) {
+      region.put(createPut(1, i));
+    }
+
+    s = s + "The smallest sequence in region WAL is: " + 
smallestSeqInRegionCurrentMemstorePhaseII
+        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " +
+        "the smallest sequence in CF2:"
+        + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + 
smallestSeqCF3PhaseII + "\n";
+
+    // How much does the CF1 memstore occupy? Will be used later.
+    long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
+    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
+
+    s = s + "----After more puts into CF1 its size is:" + 
cf1MemstoreSizePhaseIII
+        + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ;
+
+
+    // Flush!!!!!!!!!!!!!!!!!!!!!!
+    // Flush again, CF1 is flushed to disk
+    // CF2 is flushed to disk, because it is not in-memory compacted memstore
+    // CF3 is flushed empty to memory (actually nothing happens to CF3)
+    region.flush(false);
+
+    // Recalculate everything
+    long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
+
+    long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
+        
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
+
+    s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + 
", CF2 size is:"
+        + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV
+        + "\n";
+
+    s = s + "The smallest sequence in region WAL is: " + 
smallestSeqInRegionCurrentMemstorePhaseIV
+        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
+        "the smallest sequence in CF2:"
+        + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + 
smallestSeqCF3PhaseIV
+        + "\n";
+
+    // CF1's pipeline component (inserted before first flush) should be 
flushed to disk
+    // CF2 should be flushed to disk
+    assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV);
+
+    // CF3 shouldn't have been touched.
+    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
+
+    // the smallest LSN of CF3 shouldn't change
+    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
+
+    // CF3 should be bottleneck for WAL
+     assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, 
smallestSeqCF3PhaseIV);
+
+    // Flush!!!!!!!!!!!!!!!!!!!!!!
+    // Trying to clean the existing memstores, CF2 all flushed to disk. The 
single
+    // memstore segment in the compaction pipeline of CF1 and CF3 should be 
flushed to disk.
+    // Note that active set of CF3 is empty
+    // But active set of CF1 is not yet empty
+    region.flush(true);
+
+    // Recalculate everything
+    long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
+    long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
+        
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    assertTrue(DefaultMemStore.DEEP_OVERHEAD < cf1MemstoreSizePhaseV);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV);
+
+    region.flush(true); // flush once again in order to be sure that 
everything is empty
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, 
region.getStore(FAMILY1).getMemStoreSize());
+
+    // What happens when we hit the memstore limit, but we are not able to find
+    // any Column Family above the threshold?
+    // In that case, we should flush all the CFs.
+
+    // The memstore limit is 200*1024 and the column family flush threshold is
+    // around 50*1024. We try to just hit the memstore limit with each CF's
+    // memstore being below the CF flush threshold.
+    for (int i = 1; i <= 300; i++) {
+      region.put(createPut(1, i));
+      region.put(createPut(2, i));
+      region.put(createPut(3, i));
+      region.put(createPut(4, i));
+      region.put(createPut(5, i));
+    }
+
+    region.flush(false);
+
+    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region 
WAL is: "
+        + smallestSeqInRegionCurrentMemstorePhaseV
+        + ". After additional inserts and last flush, the entire region size 
is:" + region
+        .getMemstoreSize()
+        + "\n----------------------------------\n";
+
+    // Since we won't find any CF above the threshold, and hence no specific
+    // store to flush, we should flush all the memstores
+    // Also compacted memstores are flushed to disk.
+    assertEquals(0, region.getMemstoreSize());
+    System.out.println(s);
+    HBaseTestingUtility.closeRegionAndWAL(region);
+  }
+
+  @Test(timeout = 180000)
+  public void testSelectiveFlushWhenEnabledNoFlattening() throws IOException {
+
+    // Set up the configuration
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+        FlushNonSloppyStoresFirstPolicy.class.getName());
+    
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
 200 * 1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.5);
 
+    // set memstore segment flattening to false and compact to skip-list
+    conf.setBoolean("hbase.hregion.compacting.memstore.flatten", false);
+    conf.setInt("hbase.hregion.compacting.memstore.type",1);
+
     // Intialize the region
     Region region = initHRegion("testSelectiveFlushWhenEnabled", conf);
 
@@ -201,7 +453,8 @@ public class TestWalAndCompactingMemStoreFlush {
     // memstores of CF1, CF2 and CF3.
     String msg = "totalMemstoreSize="+totalMemstoreSize +
         " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
-        " 
DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM
 +
+        " 
DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+        +
         " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
         " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
         " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
@@ -238,8 +491,8 @@ public class TestWalAndCompactingMemStoreFlush {
 
     s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD
         + ", CompactingMemStore DEEP_OVERHEAD is:" + 
CompactingMemStore.DEEP_OVERHEAD
-        + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_ITEM is:" + 
CompactingMemStore
-        .DEEP_OVERHEAD_PER_PIPELINE_ITEM
+        + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" 
+ CompactingMemStore
+        .DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
         + "\n----After first flush! CF1 should be flushed to memory, but not 
compacted.---\n"
         + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + 
cf2MemstoreSizePhaseII
         + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
@@ -247,14 +500,14 @@ public class TestWalAndCompactingMemStoreFlush {
     // CF1 was flushed to memory, but there is nothing to compact, should
     // remain the same size plus renewed empty skip-list
     assertEquals(s, cf1MemstoreSizePhaseII,
-        cf1MemstoreSizePhaseI + 
CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM);
+        cf1MemstoreSizePhaseI + 
CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM);
 
     // CF2 should become empty
     assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII);
 
     // verify that CF3 was flushed to memory and was compacted (this is 
approximation check)
     assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD +
-        CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM >
+        CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM >
         cf3MemstoreSizePhaseII);
     assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII);
 
@@ -322,7 +575,7 @@ public class TestWalAndCompactingMemStoreFlush {
     assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
 
     // CF3 should be bottleneck for WAL
-     assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, 
smallestSeqCF3PhaseIV);
+    assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, 
smallestSeqCF3PhaseIV);
 
     // Flush!!!!!!!!!!!!!!!!!!!!!!
     // Clearing the existing memstores, CF2 all flushed to disk. The single
@@ -419,7 +672,8 @@ public class TestWalAndCompactingMemStoreFlush {
     // memstores of CF1, CF2 and CF3.
     String msg = "totalMemstoreSize="+totalMemstoreSize +
         " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
-        " 
DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM
 +
+        " 
DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+        +
         " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
         " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
         " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;

http://git-wip-us.apache.org/repos/asf/hbase/blob/32c21f45/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb 
b/hbase-shell/src/main/ruby/hbase/admin.rb
index e4f52e9..67dde53 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -815,7 +815,8 @@ module Hbase
       
family.setScope(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE)))
 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE)
       
family.setCacheDataOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE)))
 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE)
       
family.setInMemory(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)))
 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)
-      
family.setCompacted(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION)))
 if 
arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION)
+      family.setInMemoryCompaction(
+                  
JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION)))
 if 
arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION)
       
family.setTimeToLive(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::TTL))
 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL)
       
family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING)))
 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING)
       
family.setBlocksize(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)))
 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)

Reply via email to