HBASE-16608 Introducing the ability to merge ImmutableSegments without 
copy-compaction or SQM usage. (Anastasia)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/988d1f9b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/988d1f9b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/988d1f9b

Branch: refs/heads/master
Commit: 988d1f9bc955de53b9bf520b69be5bedc51f4f7e
Parents: 1b12a60
Author: anoopsamjohn <anoopsamj...@gmail.com>
Authored: Mon Oct 24 23:09:48 2016 +0530
Committer: anoopsamjohn <anoopsamj...@gmail.com>
Committed: Mon Oct 24 23:09:48 2016 +0530

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  |  11 +-
 .../hbase/regionserver/CompactionPipeline.java  |  25 +-
 .../hbase/regionserver/HeapMemStoreLAB.java     |  12 +-
 .../regionserver/ImmutableMemStoreLAB.java      |  92 ++++
 .../hbase/regionserver/ImmutableSegment.java    |  28 +-
 .../hbase/regionserver/MemStoreCompactor.java   | 272 +++++-----
 .../regionserver/MemStoreCompactorIterator.java | 159 ------
 .../MemStoreCompactorSegmentsIterator.java      | 149 ++++++
 .../MemStoreMergerSegmentsIterator.java         |  68 +++
 .../regionserver/MemStoreSegmentsIterator.java  |  64 +++
 .../hbase/regionserver/SegmentFactory.java      |  42 +-
 .../regionserver/VersionedSegmentsList.java     |   3 +-
 .../regionserver/TestCompactingMemStore.java    |  15 +
 .../TestCompactingToCellArrayMapMemStore.java   |  35 +-
 .../hbase/regionserver/TestMemStoreLAB.java     |   4 +-
 .../regionserver/TestPerColumnFamilyFlush.java  |   2 +-
 .../TestWalAndCompactingMemStoreFlush.java      | 516 ++++++++++++++-----
 17 files changed, 1038 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 177f222..1ecd868 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -195,9 +195,9 @@ public class CompactingMemStore extends AbstractMemStore {
     return list;
   }
 
-  public boolean swapCompactedSegments(VersionedSegmentsList versionedList,
-      ImmutableSegment result) {
-    return pipeline.swap(versionedList, result);
+  public boolean swapCompactedSegments(VersionedSegmentsList versionedList, 
ImmutableSegment result,
+      boolean merge) {
+    return pipeline.swap(versionedList, result, !merge);
   }
 
   /**
@@ -394,6 +394,11 @@ public class CompactingMemStore extends AbstractMemStore {
     allowCompaction.set(true);
   }
 
+  @VisibleForTesting
+  void initiateType() {
+    compactor.initiateAction();
+  }
+
   /**
    * @param cell Find the row that comes after this one.  If null, we return 
the
    *             first.

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 6a13f43..28c9383 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -90,13 +90,16 @@ public class CompactionPipeline {
    * Swapping only if there were no changes to the suffix of the list while it 
was compacted.
    * @param versionedList tail of the pipeline that was compacted
    * @param segment new compacted segment
+   * @param closeSuffix whether to close the suffix (to release memory), as 
part of swapping it out
+   *        During index merge op this will be false and for compaction it 
will be true.
    * @return true iff swapped tail with new compacted segment
    */
-  public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment 
segment) {
+  public boolean swap(
+      VersionedSegmentsList versionedList, ImmutableSegment segment, boolean 
closeSuffix) {
     if (versionedList.getVersion() != version) {
       return false;
     }
-    LinkedList<ImmutableSegment> suffix;
+    List<ImmutableSegment> suffix;
     synchronized (pipeline){
       if(versionedList.getVersion() != version) {
         return false;
@@ -108,13 +111,14 @@ public class CompactionPipeline {
             + versionedList.getStoreSegments().size()
             + ", and the number of cells in new segment is:" + 
segment.getCellsCount());
       }
-      swapSuffix(suffix,segment);
+      swapSuffix(suffix,segment, closeSuffix);
     }
     if (region != null) {
       // update the global memstore size counter
       long suffixSize = getSegmentsKeySize(suffix);
       long newSize = segment.keySize();
       long delta = suffixSize - newSize;
+      assert ( closeSuffix || delta>0 ); // sanity check
       long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + 
newSize
@@ -204,10 +208,19 @@ public class CompactionPipeline {
     return pipeline.peekLast().keySize();
   }
 
-  private void swapSuffix(LinkedList<ImmutableSegment> suffix, 
ImmutableSegment segment) {
+  private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment 
segment,
+      boolean closeSegmentsInSuffix) {
     version++;
-    for (Segment itemInSuffix : suffix) {
-      itemInSuffix.close();
+    // During index merge we won't be closing the segments undergoing the 
merge. Segment#close()
+    // will release the MSLAB chunks to pool. But in case of index merge there 
wont be any data copy
+    // from old MSLABs. So the new cells in new segment also refers to same 
chunks. In case of data
+    // compaction, we would have copied the cells data from old MSLAB chunks 
into a new chunk
+    // created for the result segment. So we can release the chunks associated 
with the compacted
+    // segments.
+    if (closeSegmentsInSuffix) {
+      for (Segment itemInSuffix : suffix) {
+        itemInSuffix.close();
+      }
     }
     pipeline.removeAll(suffix);
     pipeline.addLast(segment);

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
index 378601d..99b2bb6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
@@ -69,6 +69,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {
 
   private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
   // A queue of chunks from pool contained by this memstore LAB
+  // TODO: in the future, it would be better to have List implementation 
instead of Queue,
+  // as FIFO order is not so important here
   @VisibleForTesting
   BlockingQueue<PooledChunk> pooledChunkQueue = null;
   private final int chunkSize;
@@ -101,11 +103,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
     }
 
     // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
-    Preconditions.checkArgument(
-      maxAlloc <= chunkSize,
-      MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
+    Preconditions.checkArgument(maxAlloc <= chunkSize,
+        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
   }
 
+
   @Override
   public Cell copyCellInto(Cell cell) {
     int size = KeyValueUtil.length(cell);
@@ -236,8 +238,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {
     return this.curChunk.get();
   }
 
-  @VisibleForTesting
-  BlockingQueue<PooledChunk> getChunkQueue() {
+
+  BlockingQueue<PooledChunk> getPooledChunks() {
     return this.pooledChunkQueue;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
new file mode 100644
index 0000000..430b642
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A MemStoreLAB implementation which wraps N MemStoreLABs. Its main duty is 
in proper managing the
+ * close of the individual MemStoreLAB. This is treated as an immutable one 
and so do not allow to
+ * add any more Cells into it. {@link #copyCellInto(Cell)} throws Exception
+ */
+@InterfaceAudience.Private
+public class ImmutableMemStoreLAB implements MemStoreLAB {
+
+  private final AtomicInteger openScannerCount = new AtomicInteger();
+  private volatile boolean closed = false;
+
+  private final List<MemStoreLAB> mslabs;
+
+  public ImmutableMemStoreLAB(List<MemStoreLAB> mslabs) {
+    this.mslabs = mslabs;
+  }
+
+  @Override
+  public Cell copyCellInto(Cell cell) {
+    throw new IllegalStateException("This is an Immutable MemStoreLAB.");
+  }
+
+  @Override
+  public void close() {
+    // 'openScannerCount' here tracks the scanners opened on segments which 
directly refer to this
+    // MSLAB. The individual MSLABs this refers also having its own 
'openScannerCount'. The usage of
+    // the variable in close() and decScannerCount() is as as that in 
HeapMemstoreLAB. Here the
+    // close just delegates the call to the individual MSLABs. The actual 
return of the chunks to
+    // MSLABPool will happen within individual MSLABs only (which is at the 
leaf level).
+    // Say an ImmutableMemStoreLAB is created over 2 HeapMemStoreLABs at some 
point and at that time
+    // both of them were referred by ongoing scanners. So they have > 0 
'openScannerCount'. Now over
+    // the new Segment some scanners come in and this MSLABs 
'openScannerCount' also goes up and
+    // then come down on finish of scanners. Now a close() call comes to this 
Immutable MSLAB. As
+    // it's 'openScannerCount' is zero it will call close() on both of the 
Heap MSLABs. Say by that
+    // time the old scanners on one of the MSLAB got over where as on the 
other, still an old
+    // scanner is going on. The call close() on that MSLAB will not close it 
immediately but will
+    // just mark it for closure as it's 'openScannerCount' still > 0. Later 
once the old scan is
+    // over, the decScannerCount() call will do the actual close and return of 
the chunks.
+    this.closed = true;
+    // When there are still on going scanners over this MSLAB, we will defer 
the close until all
+    // scanners finish. We will just mark it for closure. See 
#decScannerCount(). This will be
+    // called at end of every scan. When it is marked for closure and scanner 
count reached 0, we
+    // will do the actual close then.
+    checkAndCloseMSLABs(openScannerCount.get());
+  }
+
+  private void checkAndCloseMSLABs(int openScanners) {
+    if (openScanners == 0) {
+      for (MemStoreLAB mslab : this.mslabs) {
+        mslab.close();
+      }
+    }
+  }
+
+  @Override
+  public void incScannerCount() {
+    this.openScannerCount.incrementAndGet();
+  }
+
+  @Override
+  public void decScannerCount() {
+    int count = this.openScannerCount.decrementAndGet();
+    if (this.closed) {
+      checkAndCloseMSLABs(count);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 12b7916..8e79ad5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -85,13 +85,14 @@ public class ImmutableSegment extends Segment {
    * The input parameter "type" exists for future use when more types of flat 
ImmutableSegments
    * are going to be introduced.
    */
-  protected ImmutableSegment(CellComparator comparator, 
MemStoreCompactorIterator iterator,
-      MemStoreLAB memStoreLAB, int numOfCells, Type type) {
+  protected ImmutableSegment(CellComparator comparator, 
MemStoreSegmentsIterator iterator,
+      MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) {
+
     super(null, // initiailize the CellSet with NULL
         comparator, memStoreLAB);
     this.type = type;
     // build the true CellSet based on CellArrayMap
-    CellSet cs = createCellArrayMapSet(numOfCells, iterator);
+    CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
 
     this.setCellSet(null, cs);            // update the CellSet of the new 
Segment
     this.timeRange = this.timeRangeTracker == null ? null : 
this.timeRangeTracker.toTimeRange();
@@ -102,7 +103,7 @@ public class ImmutableSegment extends Segment {
    * list of older ImmutableSegments.
    * The given iterator returns the Cells that "survived" the compaction.
    */
-  protected ImmutableSegment(CellComparator comparator, 
MemStoreCompactorIterator iterator,
+  protected ImmutableSegment(CellComparator comparator, 
MemStoreSegmentsIterator iterator,
       MemStoreLAB memStoreLAB) {
     super(new CellSet(comparator), // initiailize the CellSet with empty 
CellSet
         comparator, memStoreLAB);
@@ -155,7 +156,7 @@ public class ImmutableSegment extends Segment {
   /**------------------------------------------------------------------------
    * Change the CellSet of this ImmutableSegment from one based on 
ConcurrentSkipListMap to one
    * based on CellArrayMap.
-   * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is 
NOP
+   * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is 
NOOP
    *
    * Synchronization of the CellSet replacement:
    * The reference to the CellSet is AtomicReference and is updated only when 
ImmutableSegment
@@ -188,19 +189,26 @@ public class ImmutableSegment extends Segment {
   /////////////////////  PRIVATE METHODS  /////////////////////
   /*------------------------------------------------------------------------*/
   // Create CellSet based on CellArrayMap from compacting iterator
-  private CellSet createCellArrayMapSet(int numOfCells, 
MemStoreCompactorIterator iterator) {
+  private CellSet createCellArrayMapSet(int numOfCells, 
MemStoreSegmentsIterator iterator,
+      boolean merge) {
 
     Cell[] cells = new Cell[numOfCells];   // build the Cell Array
     int i = 0;
     while (iterator.hasNext()) {
       Cell c = iterator.next();
       // The scanner behind the iterator is doing all the elimination logic
-      // now we just copy it to the new segment (also MSLAB copy)
-      cells[i] = maybeCloneWithAllocator(c);
-      boolean usedMSLAB = (cells[i] != c);
+      if (merge) {
+        // if this is merge we just move the Cell object without copying MSLAB
+        // the sizes still need to be updated in the new segment
+        cells[i] = c;
+      } else {
+        // now we just copy it to the new segment (also MSLAB copy)
+        cells[i] = maybeCloneWithAllocator(c);
+      }
+      boolean useMSLAB = (getMemStoreLAB()!=null);
       // second parameter true, because in compaction addition of the cell to 
new segment
       // is always successful
-      updateMetaInfo(c, true, usedMSLAB); // updates the size per cell
+      updateMetaInfo(c, true, useMSLAB); // updates the size per cell
       i++;
     }
     // build the immutable CellSet

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 714ffe3..0df3674 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
@@ -43,37 +44,30 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class MemStoreCompactor {
 
   public static final long DEEP_OVERHEAD = ClassSize
-      .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT 
+ Bytes.SIZEOF_DOUBLE
-          + ClassSize.ATOMIC_BOOLEAN);
-
-  // Option for external guidance whether flattening is allowed
-  static final String MEMSTORE_COMPACTOR_FLATTENING = 
"hbase.hregion.compacting.memstore.flatten";
-  static final boolean MEMSTORE_COMPACTOR_FLATTENING_DEFAULT = true;
-
-  // Option for external setting of the compacted structure (SkipList, 
CellArray, etc.)
+      .align(ClassSize.OBJECT
+          + 4 * ClassSize.REFERENCE
+          // compactingMemStore, versionedList, action, isInterrupted (the 
reference)
+          // "action" is an enum and thus it is a class with static final 
constants,
+          // so counting only the size of the reference to it and not the size 
of the internals
+          + Bytes.SIZEOF_INT            // compactionKVMax
+          + ClassSize.ATOMIC_BOOLEAN    // isInterrupted (the internals)
+      );
+
+  // Configuration options for MemStore compaction
+  static final String INDEX_COMPACTION_CONFIG = "index-compaction";
+  static final String DATA_COMPACTION_CONFIG  = "data-compaction";
+
+  // The external setting of the compacting MemStore behaviour
+  // Compaction of the index without the data is the default
   static final String COMPACTING_MEMSTORE_TYPE_KEY = 
"hbase.hregion.compacting.memstore.type";
-  static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2;  // 
COMPACT_TO_ARRAY_MAP as default
-
-  // What percentage of the duplications is causing compaction?
-  static final String COMPACTION_THRESHOLD_REMAIN_FRACTION
-      = "hbase.hregion.compacting.memstore.comactPercent";
-  static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.2;
+  static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = 
INDEX_COMPACTION_CONFIG;
 
-  // Option for external guidance whether the flattening is allowed
-  static final String MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN
-      = "hbase.hregion.compacting.memstore.avoidSpeculativeScan";
-  static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = 
false;
+  // The upper bound for the number of segments we store in the pipeline prior 
to merging.
+  // This constant is subject to further experimentation.
+  private static final int THRESHOLD_PIPELINE_SEGMENTS = 1;
 
   private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
 
-  /**
-   * Types of Compaction
-   */
-  private enum Type {
-    COMPACT_TO_SKIPLIST_MAP,
-    COMPACT_TO_ARRAY_MAP
-  }
-
   private CompactingMemStore compactingMemStore;
 
   // a static version of the segment list from the pipeline
@@ -82,22 +76,28 @@ public class MemStoreCompactor {
   // a flag raised when compaction is requested to stop
   private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
 
-  // the limit to the size of the groups to be later provided to 
MemStoreCompactorIterator
+  // the limit to the size of the groups to be later provided to 
MemStoreSegmentsIterator
   private final int compactionKVMax;
 
-  double fraction = 0.8;
-
-  int immutCellsNum = 0;  // number of immutable for compaction cells
+  /**
+   * Types of actions to be done on the pipeline upon MemStoreCompaction 
invocation.
+   * Note that every value covers the previous ones, i.e. if MERGE is the 
action it implies
+   * that the youngest segment is going to be flatten anyway.
+   */
+  private enum Action {
+    NOOP,
+    FLATTEN,  // flatten the youngest segment in the pipeline
+    MERGE,    // merge all the segments in the pipeline into one
+    COMPACT   // copy-compact the data of all the segments in the pipeline
+  }
 
-  private Type type = Type.COMPACT_TO_ARRAY_MAP;
+  private Action action = Action.FLATTEN;
 
   public MemStoreCompactor(CompactingMemStore compactingMemStore) {
     this.compactingMemStore = compactingMemStore;
-    this.compactionKVMax = compactingMemStore.getConfiguration().getInt(
-        HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
-    this.fraction = 1 - compactingMemStore.getConfiguration().getDouble(
-        COMPACTION_THRESHOLD_REMAIN_FRACTION,
-        COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT);
+    this.compactionKVMax = compactingMemStore.getConfiguration()
+        .getInt(HConstants.COMPACTION_KV_MAX, 
HConstants.COMPACTION_KV_MAX_DEFAULT);
+    initiateAction();
   }
 
   /**----------------------------------------------------------------------
@@ -106,26 +106,16 @@ public class MemStoreCompactor {
    * is already an ongoing compaction or no segments to compact.
    */
   public boolean start() throws IOException {
-    if (!compactingMemStore.hasImmutableSegments()) return false;  // no 
compaction on empty
-
-    int t = 
compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY,
-        COMPACTING_MEMSTORE_TYPE_DEFAULT);
-
-    switch (t) {
-      case 1: type = Type.COMPACT_TO_SKIPLIST_MAP;
-        break;
-      case 2: type = Type.COMPACT_TO_ARRAY_MAP;
-        break;
-      default: throw new RuntimeException("Unknown type " + type); // sanity 
check
+    if (!compactingMemStore.hasImmutableSegments()) { // no compaction on 
empty pipeline
+      return false;
     }
 
     // get a snapshot of the list of the segments from the pipeline,
     // this local copy of the list is marked with specific version
     versionedList = compactingMemStore.getImmutableSegments();
-    immutCellsNum = versionedList.getNumOfCells();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Starting the MemStore In-Memory Shrink of type " + type + " 
for store "
+      LOG.debug("Starting the In-Memory Compaction for store "
           + compactingMemStore.getStore().getColumnFamilyName());
     }
 
@@ -143,7 +133,14 @@ public class MemStoreCompactor {
   }
 
   /**----------------------------------------------------------------------
-  * Close the scanners and clear the pointers in order to allow good
+   * The interface to check whether user requested the index-compaction
+   */
+  public boolean isIndexCompaction() {
+    return (action == Action.MERGE);
+  }
+
+  /**----------------------------------------------------------------------
+  * Reset the interruption indicator and clear the pointers in order to allow 
good
   * garbage collection
   */
   private void releaseResources() {
@@ -152,45 +149,35 @@ public class MemStoreCompactor {
   }
 
   /**----------------------------------------------------------------------
-   * Check whether there are some signs to definitely not to flatten,
-   * returns false if we must compact. If this method returns true we
-   * still need to evaluate the compaction.
+   * Decide what to do with the new and old segments in the compaction 
pipeline.
+   * Implements basic in-memory compaction policy.
    */
-  private boolean shouldFlatten() {
-    boolean userToFlatten =         // the user configurable option to flatten 
or not to flatten
-        
compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING,
-            MEMSTORE_COMPACTOR_FLATTENING_DEFAULT);
-    if (userToFlatten==false) {
-      LOG.debug("In-Memory shrink is doing compaction, as user asked to avoid 
flattening");
-      return false;                 // the user doesn't want to flatten
+  private Action policy() {
+
+    if (isInterrupted.get()) {      // if the entire process is interrupted 
cancel flattening
+      return Action.NOOP;           // the compaction also doesn't start when 
interrupted
     }
 
+    if (action == Action.COMPACT) { // compact according to the user request
+      LOG.debug("In-Memory Compaction Pipeline for store " + 
compactingMemStore.getFamilyName()
+          + " is going to be compacted, number of"
+          + " cells before compaction is " + versionedList.getNumOfCells());
+      return Action.COMPACT;
+    }
+
+    // compaction shouldn't happen or doesn't worth it
     // limit the number of the segments in the pipeline
     int numOfSegments = versionedList.getNumOfSegments();
-    if (numOfSegments > 3) {        // hard-coded for now as it is going to 
move to policy
-      LOG.debug("In-Memory shrink is doing compaction, as there already are " 
+ numOfSegments
-          + " segments in the compaction pipeline");
-      return false;                 // to avoid "too many open files later", 
compact now
-    }
-    // till here we hvae all the signs that it is possible to flatten, run the 
speculative scan
-    // (if allowed by the user) to check the efficiency of compaction
-    boolean avoidSpeculativeScan =   // the user configurable option to avoid 
the speculative scan
-        
compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN,
-            MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT);
-    if (avoidSpeculativeScan==true) {
-      LOG.debug("In-Memory shrink is doing flattening, as user asked to avoid 
compaction "
-          + "evaluation");
-      return true;                  // flatten without checking the compaction 
expedience
+    if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) {
+      LOG.debug("In-Memory Compaction Pipeline for store " + 
compactingMemStore.getFamilyName()
+          + " is going to be merged, as there are " + numOfSegments + " 
segments");
+      return Action.MERGE;          // to avoid too many segments, merge now
     }
-    try {
-      immutCellsNum = countCellsForCompaction();
-      if (immutCellsNum > fraction * versionedList.getNumOfCells()) {
-        return true;
-      }
-    } catch(Exception e) {
-      return true;
-    }
-    return false;
+
+    // if nothing of the above, then just flatten the newly joined segment
+    LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for 
store "
+        + compactingMemStore.getFamilyName() + " is going to be flattened");
+    return Action.FLATTEN;
   }
 
   /**----------------------------------------------------------------------
@@ -201,95 +188,106 @@ public class MemStoreCompactor {
   private void doCompaction() {
     ImmutableSegment result = null;
     boolean resultSwapped = false;
-
+    Action nextStep = null;
     try {
-      // PHASE I: estimate the compaction expedience - EVALUATE COMPACTION
-      if (shouldFlatten()) {
-        // too much cells "survive" the possible compaction, we do not want to 
compact!
-        LOG.debug("In-Memory compaction does not pay off - storing the 
flattened segment"
-            + " for store: " + compactingMemStore.getFamilyName());
-        // Looking for Segment in the pipeline with SkipList index, to make it 
flat
+      nextStep = policy();
+
+      if (nextStep == Action.NOOP) {
+        return;
+      }
+      if (nextStep == Action.FLATTEN) {
+        // Youngest Segment in the pipeline is with SkipList index, make it 
flat
         compactingMemStore.flattenOneSegment(versionedList.getVersion());
         return;
       }
 
-      // PHASE II: create the new compacted ImmutableSegment - START 
COPY-COMPACTION
+      // Create one segment representing all segments in the compaction 
pipeline,
+      // either by compaction or by merge
       if (!isInterrupted.get()) {
-        result = compact(immutCellsNum);
+        result = createSubstitution();
       }
 
-      // Phase III: swap the old compaction pipeline - END COPY-COMPACTION
+      // Substitute the pipeline with one segment
       if (!isInterrupted.get()) {
-        if (resultSwapped = 
compactingMemStore.swapCompactedSegments(versionedList, result)) {
+        if (resultSwapped = compactingMemStore.swapCompactedSegments(
+            versionedList, result, (action==Action.MERGE))) {
           // update the wal so it can be truncated and not get too long
           compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // 
only if greater
         }
       }
-    } catch (Exception e) {
+    } catch (IOException e) {
       LOG.debug("Interrupting the MemStore in-memory compaction for store "
           + compactingMemStore.getFamilyName());
       Thread.currentThread().interrupt();
     } finally {
-      if ((result != null) && (!resultSwapped)) result.close();
+      // For the MERGE case, if the result was created, but swap didn't happen,
+      // we DON'T need to close the result segment (meaning its MSLAB)!
+      // Because closing the result segment means closing the chunks of all 
segments
+      // in the compaction pipeline, which still have ongoing scans.
+      if (nextStep != Action.MERGE) {
+        if ((result != null) && (!resultSwapped)) {
+          result.close();
+        }
+      }
       releaseResources();
     }
 
   }
 
   /**----------------------------------------------------------------------
-   * The copy-compaction is the creation of the ImmutableSegment (from the 
relevant type)
-   * based on the Compactor Iterator. The new ImmutableSegment is returned.
+   * Creation of the ImmutableSegment either by merge or copy-compact of the 
segments of the
+   * pipeline, based on the Compactor Iterator. The new ImmutableSegment is 
returned.
    */
-  private ImmutableSegment compact(int numOfCells) throws IOException {
-
-    LOG.debug("In-Memory compaction does pay off - The estimated number of 
cells "
-        + "after compaction is " + numOfCells + ", while number of cells 
before is " + versionedList
-        .getNumOfCells() + ". The fraction of remaining cells should be: " + 
fraction);
+  private ImmutableSegment createSubstitution() throws IOException {
 
     ImmutableSegment result = null;
-    MemStoreCompactorIterator iterator =
-        new MemStoreCompactorIterator(versionedList.getStoreSegments(),
-            compactingMemStore.getComparator(),
-            compactionKVMax, compactingMemStore.getStore());
-    try {
-      switch (type) {
-      case COMPACT_TO_SKIPLIST_MAP:
-        result = SegmentFactory.instance().createImmutableSegment(
-            compactingMemStore.getConfiguration(), 
compactingMemStore.getComparator(), iterator);
-        break;
-      case COMPACT_TO_ARRAY_MAP:
-        result = SegmentFactory.instance().createImmutableSegment(
-            compactingMemStore.getConfiguration(), 
compactingMemStore.getComparator(), iterator,
-            numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED);
-        break;
-      default: throw new RuntimeException("Unknown type " + type); // sanity 
check
-      }
-    } finally {
+    MemStoreSegmentsIterator iterator = null;
+
+    switch (action) {
+    case COMPACT:
+      iterator =
+          new 
MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
+              compactingMemStore.getComparator(),
+              compactionKVMax, compactingMemStore.getStore());
+
+      result = SegmentFactory.instance().createImmutableSegmentByCompaction(
+          compactingMemStore.getConfiguration(), 
compactingMemStore.getComparator(), iterator,
+          versionedList.getNumOfCells(), 
ImmutableSegment.Type.ARRAY_MAP_BASED);
       iterator.close();
+      break;
+    case MERGE:
+      iterator =
+          new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
+              compactingMemStore.getComparator(),
+              compactionKVMax, compactingMemStore.getStore());
+
+      result = SegmentFactory.instance().createImmutableSegmentByMerge(
+          compactingMemStore.getConfiguration(), 
compactingMemStore.getComparator(), iterator,
+          versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED,
+          versionedList.getStoreSegments());
+      iterator.close();
+      break;
+    default: throw new RuntimeException("Unknown action " + action); // sanity 
check
     }
 
     return result;
   }
 
   /**----------------------------------------------------------------------
-   * Count cells to estimate the efficiency of the future compaction
+   * Initiate the action according to user config, after its default is 
Action.MERGE
    */
-  private int countCellsForCompaction() throws IOException {
-
-    int cnt = 0;
-    MemStoreCompactorIterator iterator =
-        new MemStoreCompactorIterator(
-            versionedList.getStoreSegments(), 
compactingMemStore.getComparator(),
-            compactionKVMax, compactingMemStore.getStore());
+  @VisibleForTesting
+  void initiateAction() {
+    String memStoreType = 
compactingMemStore.getConfiguration().get(COMPACTING_MEMSTORE_TYPE_KEY,
+        COMPACTING_MEMSTORE_TYPE_DEFAULT);
 
-    try {
-      while (iterator.next() != null) {
-        cnt++;
-      }
-    } finally {
-      iterator.close();
+    switch (memStoreType) {
+    case INDEX_COMPACTION_CONFIG: action = Action.MERGE;
+      break;
+    case DATA_COMPACTION_CONFIG: action = Action.COMPACT;
+      break;
+    default:
+      throw new RuntimeException("Unknown memstore type " + memStoreType); // 
sanity check
     }
-
-    return cnt;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java
deleted file mode 100644
index 9798ec2..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * The MemStoreCompactorIterator is designed to perform one iteration over 
given list of segments
- * For another iteration new instance of MemStoreCompactorIterator needs to be 
created
- * The iterator is not thread-safe and must have only one instance in each 
period of time
- */
-@InterfaceAudience.Private
-public class MemStoreCompactorIterator implements Iterator<Cell> {
-
-  private List<Cell> kvs = new ArrayList<Cell>();
-
-  // scanner for full or partial pipeline (heap of segment scanners)
-  // we need to keep those scanners in order to close them at the end
-  private KeyValueScanner scanner;
-
-  // scanner on top of pipeline scanner that uses ScanQueryMatcher
-  private StoreScanner compactingScanner;
-
-  private final ScannerContext scannerContext;
-
-  private boolean hasMore;
-  private Iterator<Cell> kvsIterator;
-
-  // C-tor
-  public MemStoreCompactorIterator(List<ImmutableSegment> segments,
-      CellComparator comparator, int compactionKVMax, Store store) throws 
IOException {
-
-    this.scannerContext = 
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
-
-    // list of Scanners of segments in the pipeline, when compaction starts
-    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
-
-    // create the list of scanners with maximally possible read point, meaning 
that
-    // all KVs are going to be returned by the pipeline traversing
-    for (Segment segment : segments) {
-      scanners.add(segment.getScanner(store.getSmallestReadPoint()));
-    }
-
-    scanner = new MemStoreScanner(comparator, scanners, true);
-
-    // reinitialize the compacting scanner for each instance of iterator
-    compactingScanner = createScanner(store, scanner);
-
-    hasMore = compactingScanner.next(kvs, scannerContext);
-
-    if (!kvs.isEmpty()) {
-      kvsIterator = kvs.iterator();
-    }
-
-  }
-
-  @Override
-  public boolean hasNext() {
-    if (!kvsIterator.hasNext()) {
-      // refillKVS() method should be invoked only if !kvsIterator.hasNext()
-      if (!refillKVS()) {
-        return false;
-      }
-    }
-    return (kvsIterator.hasNext() || hasMore);
-  }
-
-  @Override
-  public Cell next()  {
-    if (!kvsIterator.hasNext()) {
-      // refillKVS() method should be invoked only if !kvsIterator.hasNext()
-      if (!refillKVS())  return null;
-    }
-    return (!hasMore) ? null : kvsIterator.next();
-  }
-
-  public void close() {
-    compactingScanner.close();
-    compactingScanner = null;
-    scanner = null;
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Creates the scanner for compacting the pipeline.
-   *
-   * @return the scanner
-   */
-  private StoreScanner createScanner(Store store, KeyValueScanner scanner)
-      throws IOException {
-
-    Scan scan = new Scan();
-    scan.setMaxVersions();  //Get all available versions
-    StoreScanner internalScanner =
-        new StoreScanner(store, store.getScanInfo(), scan, 
Collections.singletonList(scanner),
-            ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
-            HConstants.OLDEST_TIMESTAMP);
-
-    return internalScanner;
-  }
-
-
-
-  private boolean refillKVS() {
-    kvs.clear();          // clear previous KVS, first initiated in the 
constructor
-    if (!hasMore) {       // if there is nothing expected next in 
compactingScanner
-      return false;
-    }
-
-    try {                 // try to get next KVS
-      hasMore = compactingScanner.next(kvs, scannerContext);
-    } catch (IOException ie) {
-      throw new IllegalStateException(ie);
-    }
-
-    if (!kvs.isEmpty() ) {// is the new KVS empty ?
-      kvsIterator = kvs.iterator();
-      return true;
-    } else {
-      // KVS is empty, but hasMore still true?
-      if (hasMore) {      // try to move to next row
-        return refillKVS();
-      }
-
-    }
-    return hasMore;
-  }
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
new file mode 100644
index 0000000..f31c973
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
+ * and performs the scan for compaction operation meaning it is based on SQM
+ */
+@InterfaceAudience.Private
+public class MemStoreCompactorSegmentsIterator extends 
MemStoreSegmentsIterator {
+
+  private List<Cell> kvs = new ArrayList<Cell>();
+  private boolean hasMore;
+  private Iterator<Cell> kvsIterator;
+
+  // scanner on top of pipeline scanner that uses ScanQueryMatcher
+  private StoreScanner compactingScanner;
+
+  // C-tor
+  public MemStoreCompactorSegmentsIterator(
+      List<ImmutableSegment> segments,
+      CellComparator comparator, int compactionKVMax, Store store
+  ) throws IOException {
+    super(segments,comparator,compactionKVMax,store);
+
+    // build the scanner based on Query Matcher
+    // reinitialize the compacting scanner for each instance of iterator
+    compactingScanner = createScanner(store, scanner);
+
+    hasMore = compactingScanner.next(kvs, scannerContext);
+
+    if (!kvs.isEmpty()) {
+      kvsIterator = kvs.iterator();
+    }
+
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (kvsIterator == null)  { // for the case when the result is empty
+      return false;
+    }
+    if (!kvsIterator.hasNext()) {
+      // refillKVS() method should be invoked only if !kvsIterator.hasNext()
+      if (!refillKVS()) {
+        return false;
+      }
+    }
+    return kvsIterator.hasNext();
+  }
+
+  @Override
+  public Cell next()  {
+    if (kvsIterator == null)  { // for the case when the result is empty
+      return null;
+    }
+    if (!kvsIterator.hasNext()) {
+      // refillKVS() method should be invoked only if !kvsIterator.hasNext()
+      if (!refillKVS())  return null;
+    }
+    return (!hasMore) ? null : kvsIterator.next();
+  }
+
+  public void close() {
+    compactingScanner.close();
+    compactingScanner = null;
+    scanner = null;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Creates the scanner for compacting the pipeline.
+   *
+   * @return the scanner
+   */
+  private StoreScanner createScanner(Store store, KeyValueScanner scanner)
+      throws IOException {
+
+    Scan scan = new Scan();
+    scan.setMaxVersions();  //Get all available versions
+    StoreScanner internalScanner =
+        new StoreScanner(store, store.getScanInfo(), scan, 
Collections.singletonList(scanner),
+            ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
+            HConstants.OLDEST_TIMESTAMP);
+
+    return internalScanner;
+  }
+
+
+  /* Refill kev-value set (should be invoked only when KVS is empty)
+   * Returns true if KVS is non-empty */
+  private boolean refillKVS() {
+    kvs.clear();          // clear previous KVS, first initiated in the 
constructor
+    if (!hasMore) {       // if there is nothing expected next in 
compactingScanner
+      return false;
+    }
+
+    try {                 // try to get next KVS
+      hasMore = compactingScanner.next(kvs, scannerContext);
+    } catch (IOException ie) {
+      throw new IllegalStateException(ie);
+    }
+
+    if (!kvs.isEmpty() ) {// is the new KVS empty ?
+      kvsIterator = kvs.iterator();
+      return true;
+    } else {
+      // KVS is empty, but hasMore still true?
+      if (hasMore) {      // try to move to next row
+        return refillKVS();
+      }
+
+    }
+    return hasMore;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java
new file mode 100644
index 0000000..625fc76
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator
+ * and performs the scan for simple merge operation meaning it is NOT based on 
SQM
+ */
+@InterfaceAudience.Private
+public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator {
+
+  // C-tor
+  public MemStoreMergerSegmentsIterator(List<ImmutableSegment> segments, 
CellComparator comparator,
+      int compactionKVMax, Store store
+  ) throws IOException {
+    super(segments,comparator,compactionKVMax,store);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return (scanner.peek()!=null);
+  }
+
+  @Override
+  public Cell next()  {
+    Cell result = null;
+    try {                 // try to get next
+      result = scanner.next();
+    } catch (IOException ie) {
+      throw new IllegalStateException(ie);
+    }
+    return result;
+  }
+
+  public void close() {
+    scanner.close();
+    scanner = null;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java
new file mode 100644
index 0000000..8790bc2
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * The MemStoreSegmentsIterator is designed to perform one iteration over 
given list of segments
+ * For another iteration new instance of MemStoreSegmentsIterator needs to be 
created
+ * The iterator is not thread-safe and must have only one instance per MemStore
+ * in each period of time
+ */
+@InterfaceAudience.Private
+public abstract class MemStoreSegmentsIterator implements Iterator<Cell> {
+
+  // scanner for full or partial pipeline (heap of segment scanners)
+  // we need to keep those scanners in order to close them at the end
+  protected KeyValueScanner scanner;
+
+  protected final ScannerContext scannerContext;
+
+
+  // C-tor
+  public MemStoreSegmentsIterator(List<ImmutableSegment> segments, 
CellComparator comparator,
+      int compactionKVMax, Store store) throws IOException {
+
+    this.scannerContext = 
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+
+    // list of Scanners of segments in the pipeline, when compaction starts
+    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+
+    // create the list of scanners with the smallest read point, meaning that
+    // only relevant KVs are going to be returned by the pipeline traversing
+    for (Segment segment : segments) {
+      scanners.add(segment.getScanner(store.getSmallestReadPoint()));
+    }
+
+    scanner = new MemStoreScanner(comparator, scanners, true);
+  }
+
+  public abstract void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 510ebbd..4f60976 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -25,6 +25,8 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A singleton store segment factory.
@@ -46,18 +48,22 @@ public final class SegmentFactory {
 
   // create skip-list-based (non-flat) immutable segment from compacting old 
immutable segments
   public ImmutableSegment createImmutableSegment(final Configuration conf,
-      final CellComparator comparator, MemStoreCompactorIterator iterator) {
+      final CellComparator comparator, MemStoreSegmentsIterator iterator) {
     return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf));
   }
 
-  // create new flat immutable segment from compacting old immutable segment
-  public ImmutableSegment createImmutableSegment(final Configuration conf,
-      final CellComparator comparator, MemStoreCompactorIterator iterator, int 
numOfCells,
-      ImmutableSegment.Type segmentType) throws IOException {
-    Preconditions.checkArgument(segmentType != 
ImmutableSegment.Type.SKIPLIST_MAP_BASED,
+  // create new flat immutable segment from compacting old immutable segments
+  public ImmutableSegment createImmutableSegmentByCompaction(final 
Configuration conf,
+      final CellComparator comparator, MemStoreSegmentsIterator iterator, int 
numOfCells,
+      ImmutableSegment.Type segmentType)
+      throws IOException {
+    Preconditions.checkArgument(segmentType == 
ImmutableSegment.Type.ARRAY_MAP_BASED,
         "wrong immutable segment type");
-    return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf), 
numOfCells,
-        segmentType);
+    MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
+    return
+        // the last parameter "false" means not to merge, but to compact the 
pipeline
+        // in order to create the new segment
+        new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, 
segmentType, false);
   }
 
   // create empty immutable segment
@@ -77,6 +83,19 @@ public final class SegmentFactory {
     return generateMutableSegment(conf, comparator, memStoreLAB);
   }
 
+  // create new flat immutable segment from merging old immutable segments
+  public ImmutableSegment createImmutableSegmentByMerge(final Configuration 
conf,
+      final CellComparator comparator, MemStoreSegmentsIterator iterator, int 
numOfCells,
+      ImmutableSegment.Type segmentType, List<ImmutableSegment> segments)
+      throws IOException {
+    Preconditions.checkArgument(segmentType == 
ImmutableSegment.Type.ARRAY_MAP_BASED,
+        "wrong immutable segment type");
+    MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
+    return
+        // the last parameter "true" means to merge the compaction pipeline
+        // in order to create the new segment
+        new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, 
segmentType, true);
+  }
   //****** private methods to instantiate concrete store segments **********//
 
   private MutableSegment generateMutableSegment(final Configuration conf, 
CellComparator comparator,
@@ -96,4 +115,11 @@ public final class SegmentFactory {
     return memStoreLAB;
   }
 
+  private MemStoreLAB getMergedMemStoreLAB(Configuration conf, 
List<ImmutableSegment> segments) {
+    List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
+    for (ImmutableSegment segment : segments) {
+      mslabs.add(segment.getMemStoreLAB());
+    }
+    return new ImmutableMemStoreLAB(mslabs);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
index 2e8bead..01160bf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
@@ -43,7 +44,7 @@ public class VersionedSegmentsList {
     this.version = version;
   }
 
-  public LinkedList<ImmutableSegment> getStoreSegments() {
+  public List<ImmutableSegment> getStoreSegments() {
     return storeSegments;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 211a6d8..84efb09 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -508,6 +508,11 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
   @Test
   public void testPuttingBackChunksWithOpeningPipelineScanner()
       throws IOException {
+
+    // set memstore to do data compaction and not to use the speculative scan
+    memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", 
"data-compaction");
+    ((CompactingMemStore)memstore).initiateType();
+
     byte[] row = Bytes.toBytes("testrow");
     byte[] fam = Bytes.toBytes("testfamily");
     byte[] qf1 = Bytes.toBytes("testqualifier1");
@@ -585,6 +590,10 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
   @Test
   public void testCompaction1Bucket() throws IOException {
 
+    // set memstore to do data compaction and not to use the speculative scan
+    memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", 
"data-compaction");
+    ((CompactingMemStore)memstore).initiateType();
+
     String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
 
     // test 1 bucket
@@ -609,6 +618,9 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
   @Test
   public void testCompaction2Buckets() throws IOException {
 
+    // set memstore to do data compaction and not to use the speculative scan
+    memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", 
"data-compaction");
+    ((CompactingMemStore)memstore).initiateType();
     String[] keys1 = { "A", "A", "B", "C" };
     String[] keys2 = { "A", "B", "D" };
 
@@ -647,6 +659,9 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
   @Test
   public void testCompaction3Buckets() throws IOException {
 
+    // set memstore to do data compaction and not to use the speculative scan
+    memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", 
"data-compaction");
+    ((CompactingMemStore)memstore).initiateType();
     String[] keys1 = { "A", "A", "B", "C" };
     String[] keys2 = { "A", "B", "D" };
     String[] keys3 = { "D", "B", "B" };

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index f89a040..6499251 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+
 import org.apache.hadoop.hbase.util.Threads;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -61,7 +62,8 @@ public class TestCompactingToCellArrayMapMemStore extends 
TestCompactingMemStore
     compactingSetUp();
     Configuration conf = HBaseConfiguration.create();
 
-    conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to 
CellArrayMap
+    // set memstore to do data compaction and not to use the speculative scan
+    conf.set("hbase.hregion.compacting.memstore.type", "data-compaction");
 
     this.memstore =
         new CompactingMemStore(conf, CellComparator.COMPARATOR, store,
@@ -215,18 +217,17 @@ public class TestCompactingToCellArrayMapMemStore extends 
TestCompactingMemStore
   }
 
   
//////////////////////////////////////////////////////////////////////////////
-  // Flattening tests
+  // Merging tests
   
//////////////////////////////////////////////////////////////////////////////
   @Test
-  public void testFlattening() throws IOException {
+  public void testMerging() throws IOException {
 
     String[] keys1 = { "A", "A", "B", "C", "F", "H"};
     String[] keys2 = { "A", "B", "D", "G", "I", "J"};
     String[] keys3 = { "D", "B", "B", "E" };
 
-    // set flattening to true
-    
memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten",
 true);
-
+    memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", 
"index-compaction");
+    ((CompactingMemStore)memstore).initiateType();
     addRowsByKeys(memstore, keys1);
 
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
should not compact
@@ -238,13 +239,31 @@ public class TestCompactingToCellArrayMapMemStore extends 
TestCompactingMemStore
 
     addRowsByKeys(memstore, keys2); // also should only flatten
 
+    int counter2 = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter2 += s.getCellsCount();
+    }
+    assertEquals(12, counter2);
+
     ((CompactingMemStore) memstore).disableCompaction();
 
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline 
without flattening
     assertEquals(0, memstore.getSnapshot().getCellsCount());
 
+    int counter3 = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter3 += s.getCellsCount();
+    }
+    assertEquals(12, counter3);
+
     addRowsByKeys(memstore, keys3);
 
+    int counter4 = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter4 += s.getCellsCount();
+    }
+    assertEquals(16, counter4);
+
     ((CompactingMemStore) memstore).enableCompaction();
 
 
@@ -258,7 +277,7 @@ public class TestCompactingToCellArrayMapMemStore extends 
TestCompactingMemStore
     for ( Segment s : memstore.getSegments()) {
       counter += s.getCellsCount();
     }
-    assertEquals(10,counter);
+    assertEquals(16,counter);
 
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
     ImmutableSegment s = memstore.getSnapshot();
@@ -295,7 +314,7 @@ public class TestCompactingToCellArrayMapMemStore extends 
TestCompactingMemStore
       Threads.sleep(10);
     }
     // Just doing the cnt operation here
-    MemStoreCompactorIterator itr = new MemStoreCompactorIterator(
+    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
         ((CompactingMemStore) 
memstore).getImmutableSegments().getStoreSegments(),
         CellComparator.COMPARATOR, 10, ((CompactingMemStore) 
memstore).getStore());
     int cnt = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index 1ea5112..fdd6b2c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -172,7 +172,7 @@ public class TestMemStoreLAB {
   public void testLABChunkQueue() throws Exception {
     HeapMemStoreLAB mslab = new HeapMemStoreLAB();
     // by default setting, there should be no chunk queue initialized
-    assertNull(mslab.getChunkQueue());
+    assertNull(mslab.getPooledChunks());
     // reset mslab with chunk pool
     Configuration conf = HBaseConfiguration.create();
     conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1);
@@ -209,7 +209,7 @@ public class TestMemStoreLAB {
     // close the mslab
     mslab.close();
     // make sure all chunks reclaimed or removed from chunk queue
-    int queueLength = mslab.getChunkQueue().size();
+    int queueLength = mslab.getPooledChunks().size();
     assertTrue("All chunks in chunk queue should be reclaimed or removed"
         + " after mslab closed but actually: " + queueLength, queueLength == 
0);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index 6bfaa59..f0f8c39 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -130,7 +130,7 @@ public class TestPerColumnFamilyFlush {
     
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
       100 * 1024);
     // Intialize the region
-    Region region = initHRegion("testSelectiveFlushWhenEnabled", conf);
+    Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
     for (int i = 1; i <= 1200; i++) {
       region.put(createPut(1, i));

Reply via email to