HBASE-16608 Introducing the ability to merge ImmutableSegments without copy-compaction or SQM usage. (Anastasia)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/988d1f9b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/988d1f9b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/988d1f9b Branch: refs/heads/master Commit: 988d1f9bc955de53b9bf520b69be5bedc51f4f7e Parents: 1b12a60 Author: anoopsamjohn <anoopsamj...@gmail.com> Authored: Mon Oct 24 23:09:48 2016 +0530 Committer: anoopsamjohn <anoopsamj...@gmail.com> Committed: Mon Oct 24 23:09:48 2016 +0530 ---------------------------------------------------------------------- .../hbase/regionserver/CompactingMemStore.java | 11 +- .../hbase/regionserver/CompactionPipeline.java | 25 +- .../hbase/regionserver/HeapMemStoreLAB.java | 12 +- .../regionserver/ImmutableMemStoreLAB.java | 92 ++++ .../hbase/regionserver/ImmutableSegment.java | 28 +- .../hbase/regionserver/MemStoreCompactor.java | 272 +++++----- .../regionserver/MemStoreCompactorIterator.java | 159 ------ .../MemStoreCompactorSegmentsIterator.java | 149 ++++++ .../MemStoreMergerSegmentsIterator.java | 68 +++ .../regionserver/MemStoreSegmentsIterator.java | 64 +++ .../hbase/regionserver/SegmentFactory.java | 42 +- .../regionserver/VersionedSegmentsList.java | 3 +- .../regionserver/TestCompactingMemStore.java | 15 + .../TestCompactingToCellArrayMapMemStore.java | 35 +- .../hbase/regionserver/TestMemStoreLAB.java | 4 +- .../regionserver/TestPerColumnFamilyFlush.java | 2 +- .../TestWalAndCompactingMemStoreFlush.java | 516 ++++++++++++++----- 17 files changed, 1038 insertions(+), 459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 177f222..1ecd868 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -195,9 +195,9 @@ public class CompactingMemStore extends AbstractMemStore { return list; } - public boolean swapCompactedSegments(VersionedSegmentsList versionedList, - ImmutableSegment result) { - return pipeline.swap(versionedList, result); + public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, + boolean merge) { + return pipeline.swap(versionedList, result, !merge); } /** @@ -394,6 +394,11 @@ public class CompactingMemStore extends AbstractMemStore { allowCompaction.set(true); } + @VisibleForTesting + void initiateType() { + compactor.initiateAction(); + } + /** * @param cell Find the row that comes after this one. If null, we return the * first. http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 6a13f43..28c9383 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -90,13 +90,16 @@ public class CompactionPipeline { * Swapping only if there were no changes to the suffix of the list while it was compacted. * @param versionedList tail of the pipeline that was compacted * @param segment new compacted segment + * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out + * During index merge op this will be false and for compaction it will be true. * @return true iff swapped tail with new compacted segment */ - public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) { + public boolean swap( + VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { if (versionedList.getVersion() != version) { return false; } - LinkedList<ImmutableSegment> suffix; + List<ImmutableSegment> suffix; synchronized (pipeline){ if(versionedList.getVersion() != version) { return false; @@ -108,13 +111,14 @@ public class CompactionPipeline { + versionedList.getStoreSegments().size() + ", and the number of cells in new segment is:" + segment.getCellsCount()); } - swapSuffix(suffix,segment); + swapSuffix(suffix,segment, closeSuffix); } if (region != null) { // update the global memstore size counter long suffixSize = getSegmentsKeySize(suffix); long newSize = segment.keySize(); long delta = suffixSize - newSize; + assert ( closeSuffix || delta>0 ); // sanity check long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); if (LOG.isDebugEnabled()) { LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize @@ -204,10 +208,19 @@ public class CompactionPipeline { return pipeline.peekLast().keySize(); } - private void swapSuffix(LinkedList<ImmutableSegment> suffix, ImmutableSegment segment) { + private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment, + boolean closeSegmentsInSuffix) { version++; - for (Segment itemInSuffix : suffix) { - itemInSuffix.close(); + // During index merge we won't be closing the segments undergoing the merge. Segment#close() + // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy + // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data + // compaction, we would have copied the cells data from old MSLAB chunks into a new chunk + // created for the result segment. So we can release the chunks associated with the compacted + // segments. + if (closeSegmentsInSuffix) { + for (Segment itemInSuffix : suffix) { + itemInSuffix.close(); + } } pipeline.removeAll(suffix); pipeline.addLast(segment); http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 378601d..99b2bb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -69,6 +69,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>(); // A queue of chunks from pool contained by this memstore LAB + // TODO: in the future, it would be better to have List implementation instead of Queue, + // as FIFO order is not so important here @VisibleForTesting BlockingQueue<PooledChunk> pooledChunkQueue = null; private final int chunkSize; @@ -101,11 +103,11 @@ public class HeapMemStoreLAB implements MemStoreLAB { } // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! - Preconditions.checkArgument( - maxAlloc <= chunkSize, - MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); + Preconditions.checkArgument(maxAlloc <= chunkSize, + MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } + @Override public Cell copyCellInto(Cell cell) { int size = KeyValueUtil.length(cell); @@ -236,8 +238,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { return this.curChunk.get(); } - @VisibleForTesting - BlockingQueue<PooledChunk> getChunkQueue() { + + BlockingQueue<PooledChunk> getPooledChunks() { return this.pooledChunkQueue; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java new file mode 100644 index 0000000..430b642 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A MemStoreLAB implementation which wraps N MemStoreLABs. Its main duty is in proper managing the + * close of the individual MemStoreLAB. This is treated as an immutable one and so do not allow to + * add any more Cells into it. {@link #copyCellInto(Cell)} throws Exception + */ +@InterfaceAudience.Private +public class ImmutableMemStoreLAB implements MemStoreLAB { + + private final AtomicInteger openScannerCount = new AtomicInteger(); + private volatile boolean closed = false; + + private final List<MemStoreLAB> mslabs; + + public ImmutableMemStoreLAB(List<MemStoreLAB> mslabs) { + this.mslabs = mslabs; + } + + @Override + public Cell copyCellInto(Cell cell) { + throw new IllegalStateException("This is an Immutable MemStoreLAB."); + } + + @Override + public void close() { + // 'openScannerCount' here tracks the scanners opened on segments which directly refer to this + // MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of + // the variable in close() and decScannerCount() is as as that in HeapMemstoreLAB. Here the + // close just delegates the call to the individual MSLABs. The actual return of the chunks to + // MSLABPool will happen within individual MSLABs only (which is at the leaf level). + // Say an ImmutableMemStoreLAB is created over 2 HeapMemStoreLABs at some point and at that time + // both of them were referred by ongoing scanners. So they have > 0 'openScannerCount'. Now over + // the new Segment some scanners come in and this MSLABs 'openScannerCount' also goes up and + // then come down on finish of scanners. Now a close() call comes to this Immutable MSLAB. As + // it's 'openScannerCount' is zero it will call close() on both of the Heap MSLABs. Say by that + // time the old scanners on one of the MSLAB got over where as on the other, still an old + // scanner is going on. The call close() on that MSLAB will not close it immediately but will + // just mark it for closure as it's 'openScannerCount' still > 0. Later once the old scan is + // over, the decScannerCount() call will do the actual close and return of the chunks. + this.closed = true; + // When there are still on going scanners over this MSLAB, we will defer the close until all + // scanners finish. We will just mark it for closure. See #decScannerCount(). This will be + // called at end of every scan. When it is marked for closure and scanner count reached 0, we + // will do the actual close then. + checkAndCloseMSLABs(openScannerCount.get()); + } + + private void checkAndCloseMSLABs(int openScanners) { + if (openScanners == 0) { + for (MemStoreLAB mslab : this.mslabs) { + mslab.close(); + } + } + } + + @Override + public void incScannerCount() { + this.openScannerCount.incrementAndGet(); + } + + @Override + public void decScannerCount() { + int count = this.openScannerCount.decrementAndGet(); + if (this.closed) { + checkAndCloseMSLABs(count); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 12b7916..8e79ad5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -85,13 +85,14 @@ public class ImmutableSegment extends Segment { * The input parameter "type" exists for future use when more types of flat ImmutableSegments * are going to be introduced. */ - protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, Type type) { + protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, + MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) { + super(null, // initiailize the CellSet with NULL comparator, memStoreLAB); this.type = type; // build the true CellSet based on CellArrayMap - CellSet cs = createCellArrayMapSet(numOfCells, iterator); + CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); this.setCellSet(null, cs); // update the CellSet of the new Segment this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); @@ -102,7 +103,7 @@ public class ImmutableSegment extends Segment { * list of older ImmutableSegments. * The given iterator returns the Cells that "survived" the compaction. */ - protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, + protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB) { super(new CellSet(comparator), // initiailize the CellSet with empty CellSet comparator, memStoreLAB); @@ -155,7 +156,7 @@ public class ImmutableSegment extends Segment { /**------------------------------------------------------------------------ * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one * based on CellArrayMap. - * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOP + * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOOP * * Synchronization of the CellSet replacement: * The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment @@ -188,19 +189,26 @@ public class ImmutableSegment extends Segment { ///////////////////// PRIVATE METHODS ///////////////////// /*------------------------------------------------------------------------*/ // Create CellSet based on CellArrayMap from compacting iterator - private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator) { + private CellSet createCellArrayMapSet(int numOfCells, MemStoreSegmentsIterator iterator, + boolean merge) { Cell[] cells = new Cell[numOfCells]; // build the Cell Array int i = 0; while (iterator.hasNext()) { Cell c = iterator.next(); // The scanner behind the iterator is doing all the elimination logic - // now we just copy it to the new segment (also MSLAB copy) - cells[i] = maybeCloneWithAllocator(c); - boolean usedMSLAB = (cells[i] != c); + if (merge) { + // if this is merge we just move the Cell object without copying MSLAB + // the sizes still need to be updated in the new segment + cells[i] = c; + } else { + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); + } + boolean useMSLAB = (getMemStoreLAB()!=null); // second parameter true, because in compaction addition of the cell to new segment // is always successful - updateMetaInfo(c, true, usedMSLAB); // updates the size per cell + updateMetaInfo(c, true, useMSLAB); // updates the size per cell i++; } // build the immutable CellSet http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 714ffe3..0df3674 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; @@ -43,37 +44,30 @@ import java.util.concurrent.atomic.AtomicBoolean; public class MemStoreCompactor { public static final long DEEP_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_DOUBLE - + ClassSize.ATOMIC_BOOLEAN); - - // Option for external guidance whether flattening is allowed - static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten"; - static final boolean MEMSTORE_COMPACTOR_FLATTENING_DEFAULT = true; - - // Option for external setting of the compacted structure (SkipList, CellArray, etc.) + .align(ClassSize.OBJECT + + 4 * ClassSize.REFERENCE + // compactingMemStore, versionedList, action, isInterrupted (the reference) + // "action" is an enum and thus it is a class with static final constants, + // so counting only the size of the reference to it and not the size of the internals + + Bytes.SIZEOF_INT // compactionKVMax + + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) + ); + + // Configuration options for MemStore compaction + static final String INDEX_COMPACTION_CONFIG = "index-compaction"; + static final String DATA_COMPACTION_CONFIG = "data-compaction"; + + // The external setting of the compacting MemStore behaviour + // Compaction of the index without the data is the default static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type"; - static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_TO_ARRAY_MAP as default - - // What percentage of the duplications is causing compaction? - static final String COMPACTION_THRESHOLD_REMAIN_FRACTION - = "hbase.hregion.compacting.memstore.comactPercent"; - static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.2; + static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = INDEX_COMPACTION_CONFIG; - // Option for external guidance whether the flattening is allowed - static final String MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN - = "hbase.hregion.compacting.memstore.avoidSpeculativeScan"; - static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false; + // The upper bound for the number of segments we store in the pipeline prior to merging. + // This constant is subject to further experimentation. + private static final int THRESHOLD_PIPELINE_SEGMENTS = 1; private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); - /** - * Types of Compaction - */ - private enum Type { - COMPACT_TO_SKIPLIST_MAP, - COMPACT_TO_ARRAY_MAP - } - private CompactingMemStore compactingMemStore; // a static version of the segment list from the pipeline @@ -82,22 +76,28 @@ public class MemStoreCompactor { // a flag raised when compaction is requested to stop private final AtomicBoolean isInterrupted = new AtomicBoolean(false); - // the limit to the size of the groups to be later provided to MemStoreCompactorIterator + // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator private final int compactionKVMax; - double fraction = 0.8; - - int immutCellsNum = 0; // number of immutable for compaction cells + /** + * Types of actions to be done on the pipeline upon MemStoreCompaction invocation. + * Note that every value covers the previous ones, i.e. if MERGE is the action it implies + * that the youngest segment is going to be flatten anyway. + */ + private enum Action { + NOOP, + FLATTEN, // flatten the youngest segment in the pipeline + MERGE, // merge all the segments in the pipeline into one + COMPACT // copy-compact the data of all the segments in the pipeline + } - private Type type = Type.COMPACT_TO_ARRAY_MAP; + private Action action = Action.FLATTEN; public MemStoreCompactor(CompactingMemStore compactingMemStore) { this.compactingMemStore = compactingMemStore; - this.compactionKVMax = compactingMemStore.getConfiguration().getInt( - HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); - this.fraction = 1 - compactingMemStore.getConfiguration().getDouble( - COMPACTION_THRESHOLD_REMAIN_FRACTION, - COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT); + this.compactionKVMax = compactingMemStore.getConfiguration() + .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); + initiateAction(); } /**---------------------------------------------------------------------- @@ -106,26 +106,16 @@ public class MemStoreCompactor { * is already an ongoing compaction or no segments to compact. */ public boolean start() throws IOException { - if (!compactingMemStore.hasImmutableSegments()) return false; // no compaction on empty - - int t = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY, - COMPACTING_MEMSTORE_TYPE_DEFAULT); - - switch (t) { - case 1: type = Type.COMPACT_TO_SKIPLIST_MAP; - break; - case 2: type = Type.COMPACT_TO_ARRAY_MAP; - break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline + return false; } // get a snapshot of the list of the segments from the pipeline, // this local copy of the list is marked with specific version versionedList = compactingMemStore.getImmutableSegments(); - immutCellsNum = versionedList.getNumOfCells(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting the MemStore In-Memory Shrink of type " + type + " for store " + LOG.debug("Starting the In-Memory Compaction for store " + compactingMemStore.getStore().getColumnFamilyName()); } @@ -143,7 +133,14 @@ public class MemStoreCompactor { } /**---------------------------------------------------------------------- - * Close the scanners and clear the pointers in order to allow good + * The interface to check whether user requested the index-compaction + */ + public boolean isIndexCompaction() { + return (action == Action.MERGE); + } + + /**---------------------------------------------------------------------- + * Reset the interruption indicator and clear the pointers in order to allow good * garbage collection */ private void releaseResources() { @@ -152,45 +149,35 @@ public class MemStoreCompactor { } /**---------------------------------------------------------------------- - * Check whether there are some signs to definitely not to flatten, - * returns false if we must compact. If this method returns true we - * still need to evaluate the compaction. + * Decide what to do with the new and old segments in the compaction pipeline. + * Implements basic in-memory compaction policy. */ - private boolean shouldFlatten() { - boolean userToFlatten = // the user configurable option to flatten or not to flatten - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, - MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); - if (userToFlatten==false) { - LOG.debug("In-Memory shrink is doing compaction, as user asked to avoid flattening"); - return false; // the user doesn't want to flatten + private Action policy() { + + if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening + return Action.NOOP; // the compaction also doesn't start when interrupted } + if (action == Action.COMPACT) { // compact according to the user request + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be compacted, number of" + + " cells before compaction is " + versionedList.getNumOfCells()); + return Action.COMPACT; + } + + // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > 3) { // hard-coded for now as it is going to move to policy - LOG.debug("In-Memory shrink is doing compaction, as there already are " + numOfSegments - + " segments in the compaction pipeline"); - return false; // to avoid "too many open files later", compact now - } - // till here we hvae all the signs that it is possible to flatten, run the speculative scan - // (if allowed by the user) to check the efficiency of compaction - boolean avoidSpeculativeScan = // the user configurable option to avoid the speculative scan - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, - MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); - if (avoidSpeculativeScan==true) { - LOG.debug("In-Memory shrink is doing flattening, as user asked to avoid compaction " - + "evaluation"); - return true; // flatten without checking the compaction expedience + if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be merged, as there are " + numOfSegments + " segments"); + return Action.MERGE; // to avoid too many segments, merge now } - try { - immutCellsNum = countCellsForCompaction(); - if (immutCellsNum > fraction * versionedList.getNumOfCells()) { - return true; - } - } catch(Exception e) { - return true; - } - return false; + + // if nothing of the above, then just flatten the newly joined segment + LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store " + + compactingMemStore.getFamilyName() + " is going to be flattened"); + return Action.FLATTEN; } /**---------------------------------------------------------------------- @@ -201,95 +188,106 @@ public class MemStoreCompactor { private void doCompaction() { ImmutableSegment result = null; boolean resultSwapped = false; - + Action nextStep = null; try { - // PHASE I: estimate the compaction expedience - EVALUATE COMPACTION - if (shouldFlatten()) { - // too much cells "survive" the possible compaction, we do not want to compact! - LOG.debug("In-Memory compaction does not pay off - storing the flattened segment" - + " for store: " + compactingMemStore.getFamilyName()); - // Looking for Segment in the pipeline with SkipList index, to make it flat + nextStep = policy(); + + if (nextStep == Action.NOOP) { + return; + } + if (nextStep == Action.FLATTEN) { + // Youngest Segment in the pipeline is with SkipList index, make it flat compactingMemStore.flattenOneSegment(versionedList.getVersion()); return; } - // PHASE II: create the new compacted ImmutableSegment - START COPY-COMPACTION + // Create one segment representing all segments in the compaction pipeline, + // either by compaction or by merge if (!isInterrupted.get()) { - result = compact(immutCellsNum); + result = createSubstitution(); } - // Phase III: swap the old compaction pipeline - END COPY-COMPACTION + // Substitute the pipeline with one segment if (!isInterrupted.get()) { - if (resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result)) { + if (resultSwapped = compactingMemStore.swapCompactedSegments( + versionedList, result, (action==Action.MERGE))) { // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater } } - } catch (Exception e) { + } catch (IOException e) { LOG.debug("Interrupting the MemStore in-memory compaction for store " + compactingMemStore.getFamilyName()); Thread.currentThread().interrupt(); } finally { - if ((result != null) && (!resultSwapped)) result.close(); + // For the MERGE case, if the result was created, but swap didn't happen, + // we DON'T need to close the result segment (meaning its MSLAB)! + // Because closing the result segment means closing the chunks of all segments + // in the compaction pipeline, which still have ongoing scans. + if (nextStep != Action.MERGE) { + if ((result != null) && (!resultSwapped)) { + result.close(); + } + } releaseResources(); } } /**---------------------------------------------------------------------- - * The copy-compaction is the creation of the ImmutableSegment (from the relevant type) - * based on the Compactor Iterator. The new ImmutableSegment is returned. + * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the + * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. */ - private ImmutableSegment compact(int numOfCells) throws IOException { - - LOG.debug("In-Memory compaction does pay off - The estimated number of cells " - + "after compaction is " + numOfCells + ", while number of cells before is " + versionedList - .getNumOfCells() + ". The fraction of remaining cells should be: " + fraction); + private ImmutableSegment createSubstitution() throws IOException { ImmutableSegment result = null; - MemStoreCompactorIterator iterator = - new MemStoreCompactorIterator(versionedList.getStoreSegments(), - compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); - try { - switch (type) { - case COMPACT_TO_SKIPLIST_MAP: - result = SegmentFactory.instance().createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator); - break; - case COMPACT_TO_ARRAY_MAP: - result = SegmentFactory.instance().createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); - break; - default: throw new RuntimeException("Unknown type " + type); // sanity check - } - } finally { + MemStoreSegmentsIterator iterator = null; + + switch (action) { + case COMPACT: + iterator = + new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(), + compactingMemStore.getComparator(), + compactionKVMax, compactingMemStore.getStore()); + + result = SegmentFactory.instance().createImmutableSegmentByCompaction( + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, + versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED); iterator.close(); + break; + case MERGE: + iterator = + new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), + compactingMemStore.getComparator(), + compactionKVMax, compactingMemStore.getStore()); + + result = SegmentFactory.instance().createImmutableSegmentByMerge( + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, + versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED, + versionedList.getStoreSegments()); + iterator.close(); + break; + default: throw new RuntimeException("Unknown action " + action); // sanity check } return result; } /**---------------------------------------------------------------------- - * Count cells to estimate the efficiency of the future compaction + * Initiate the action according to user config, after its default is Action.MERGE */ - private int countCellsForCompaction() throws IOException { - - int cnt = 0; - MemStoreCompactorIterator iterator = - new MemStoreCompactorIterator( - versionedList.getStoreSegments(), compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); + @VisibleForTesting + void initiateAction() { + String memStoreType = compactingMemStore.getConfiguration().get(COMPACTING_MEMSTORE_TYPE_KEY, + COMPACTING_MEMSTORE_TYPE_DEFAULT); - try { - while (iterator.next() != null) { - cnt++; - } - } finally { - iterator.close(); + switch (memStoreType) { + case INDEX_COMPACTION_CONFIG: action = Action.MERGE; + break; + case DATA_COMPACTION_CONFIG: action = Action.COMPACT; + break; + default: + throw new RuntimeException("Unknown memstore type " + memStoreType); // sanity check } - - return cnt; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java deleted file mode 100644 index 9798ec2..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; - -import java.io.IOException; -import java.util.*; - -/** - * The MemStoreCompactorIterator is designed to perform one iteration over given list of segments - * For another iteration new instance of MemStoreCompactorIterator needs to be created - * The iterator is not thread-safe and must have only one instance in each period of time - */ -@InterfaceAudience.Private -public class MemStoreCompactorIterator implements Iterator<Cell> { - - private List<Cell> kvs = new ArrayList<Cell>(); - - // scanner for full or partial pipeline (heap of segment scanners) - // we need to keep those scanners in order to close them at the end - private KeyValueScanner scanner; - - // scanner on top of pipeline scanner that uses ScanQueryMatcher - private StoreScanner compactingScanner; - - private final ScannerContext scannerContext; - - private boolean hasMore; - private Iterator<Cell> kvsIterator; - - // C-tor - public MemStoreCompactorIterator(List<ImmutableSegment> segments, - CellComparator comparator, int compactionKVMax, Store store) throws IOException { - - this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - - // list of Scanners of segments in the pipeline, when compaction starts - List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); - - // create the list of scanners with maximally possible read point, meaning that - // all KVs are going to be returned by the pipeline traversing - for (Segment segment : segments) { - scanners.add(segment.getScanner(store.getSmallestReadPoint())); - } - - scanner = new MemStoreScanner(comparator, scanners, true); - - // reinitialize the compacting scanner for each instance of iterator - compactingScanner = createScanner(store, scanner); - - hasMore = compactingScanner.next(kvs, scannerContext); - - if (!kvs.isEmpty()) { - kvsIterator = kvs.iterator(); - } - - } - - @Override - public boolean hasNext() { - if (!kvsIterator.hasNext()) { - // refillKVS() method should be invoked only if !kvsIterator.hasNext() - if (!refillKVS()) { - return false; - } - } - return (kvsIterator.hasNext() || hasMore); - } - - @Override - public Cell next() { - if (!kvsIterator.hasNext()) { - // refillKVS() method should be invoked only if !kvsIterator.hasNext() - if (!refillKVS()) return null; - } - return (!hasMore) ? null : kvsIterator.next(); - } - - public void close() { - compactingScanner.close(); - compactingScanner = null; - scanner = null; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Creates the scanner for compacting the pipeline. - * - * @return the scanner - */ - private StoreScanner createScanner(Store store, KeyValueScanner scanner) - throws IOException { - - Scan scan = new Scan(); - scan.setMaxVersions(); //Get all available versions - StoreScanner internalScanner = - new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); - - return internalScanner; - } - - - - private boolean refillKVS() { - kvs.clear(); // clear previous KVS, first initiated in the constructor - if (!hasMore) { // if there is nothing expected next in compactingScanner - return false; - } - - try { // try to get next KVS - hasMore = compactingScanner.next(kvs, scannerContext); - } catch (IOException ie) { - throw new IllegalStateException(ie); - } - - if (!kvs.isEmpty() ) {// is the new KVS empty ? - kvsIterator = kvs.iterator(); - return true; - } else { - // KVS is empty, but hasMore still true? - if (hasMore) { // try to move to next row - return refillKVS(); - } - - } - return hasMore; - } - - -} - http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java new file mode 100644 index 0000000..f31c973 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java @@ -0,0 +1,149 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator + * and performs the scan for compaction operation meaning it is based on SQM + */ +@InterfaceAudience.Private +public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator { + + private List<Cell> kvs = new ArrayList<Cell>(); + private boolean hasMore; + private Iterator<Cell> kvsIterator; + + // scanner on top of pipeline scanner that uses ScanQueryMatcher + private StoreScanner compactingScanner; + + // C-tor + public MemStoreCompactorSegmentsIterator( + List<ImmutableSegment> segments, + CellComparator comparator, int compactionKVMax, Store store + ) throws IOException { + super(segments,comparator,compactionKVMax,store); + + // build the scanner based on Query Matcher + // reinitialize the compacting scanner for each instance of iterator + compactingScanner = createScanner(store, scanner); + + hasMore = compactingScanner.next(kvs, scannerContext); + + if (!kvs.isEmpty()) { + kvsIterator = kvs.iterator(); + } + + } + + @Override + public boolean hasNext() { + if (kvsIterator == null) { // for the case when the result is empty + return false; + } + if (!kvsIterator.hasNext()) { + // refillKVS() method should be invoked only if !kvsIterator.hasNext() + if (!refillKVS()) { + return false; + } + } + return kvsIterator.hasNext(); + } + + @Override + public Cell next() { + if (kvsIterator == null) { // for the case when the result is empty + return null; + } + if (!kvsIterator.hasNext()) { + // refillKVS() method should be invoked only if !kvsIterator.hasNext() + if (!refillKVS()) return null; + } + return (!hasMore) ? null : kvsIterator.next(); + } + + public void close() { + compactingScanner.close(); + compactingScanner = null; + scanner = null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Creates the scanner for compacting the pipeline. + * + * @return the scanner + */ + private StoreScanner createScanner(Store store, KeyValueScanner scanner) + throws IOException { + + Scan scan = new Scan(); + scan.setMaxVersions(); //Get all available versions + StoreScanner internalScanner = + new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), + ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + + return internalScanner; + } + + + /* Refill kev-value set (should be invoked only when KVS is empty) + * Returns true if KVS is non-empty */ + private boolean refillKVS() { + kvs.clear(); // clear previous KVS, first initiated in the constructor + if (!hasMore) { // if there is nothing expected next in compactingScanner + return false; + } + + try { // try to get next KVS + hasMore = compactingScanner.next(kvs, scannerContext); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + if (!kvs.isEmpty() ) {// is the new KVS empty ? + kvsIterator = kvs.iterator(); + return true; + } else { + // KVS is empty, but hasMore still true? + if (hasMore) { // try to move to next row + return refillKVS(); + } + + } + return hasMore; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java new file mode 100644 index 0000000..625fc76 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java @@ -0,0 +1,68 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; +import java.util.List; + +/** + * The MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator + * and performs the scan for simple merge operation meaning it is NOT based on SQM + */ +@InterfaceAudience.Private +public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator { + + // C-tor + public MemStoreMergerSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator, + int compactionKVMax, Store store + ) throws IOException { + super(segments,comparator,compactionKVMax,store); + } + + @Override + public boolean hasNext() { + return (scanner.peek()!=null); + } + + @Override + public Cell next() { + Cell result = null; + try { // try to get next + result = scanner.next(); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + return result; + } + + public void close() { + scanner.close(); + scanner = null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java new file mode 100644 index 0000000..8790bc2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; +import java.util.*; + +/** + * The MemStoreSegmentsIterator is designed to perform one iteration over given list of segments + * For another iteration new instance of MemStoreSegmentsIterator needs to be created + * The iterator is not thread-safe and must have only one instance per MemStore + * in each period of time + */ +@InterfaceAudience.Private +public abstract class MemStoreSegmentsIterator implements Iterator<Cell> { + + // scanner for full or partial pipeline (heap of segment scanners) + // we need to keep those scanners in order to close them at the end + protected KeyValueScanner scanner; + + protected final ScannerContext scannerContext; + + + // C-tor + public MemStoreSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator, + int compactionKVMax, Store store) throws IOException { + + this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + + // list of Scanners of segments in the pipeline, when compaction starts + List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); + + // create the list of scanners with the smallest read point, meaning that + // only relevant KVs are going to be returned by the pipeline traversing + for (Segment segment : segments) { + scanners.add(segment.getScanner(store.getSmallestReadPoint())); + } + + scanner = new MemStoreScanner(comparator, scanners, true); + } + + public abstract void close(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 510ebbd..4f60976 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * A singleton store segment factory. @@ -46,18 +48,22 @@ public final class SegmentFactory { // create skip-list-based (non-flat) immutable segment from compacting old immutable segments public ImmutableSegment createImmutableSegment(final Configuration conf, - final CellComparator comparator, MemStoreCompactorIterator iterator) { + final CellComparator comparator, MemStoreSegmentsIterator iterator) { return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf)); } - // create new flat immutable segment from compacting old immutable segment - public ImmutableSegment createImmutableSegment(final Configuration conf, - final CellComparator comparator, MemStoreCompactorIterator iterator, int numOfCells, - ImmutableSegment.Type segmentType) throws IOException { - Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, + // create new flat immutable segment from compacting old immutable segments + public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf, + final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, + ImmutableSegment.Type segmentType) + throws IOException { + Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED, "wrong immutable segment type"); - return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf), numOfCells, - segmentType); + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + return + // the last parameter "false" means not to merge, but to compact the pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); } // create empty immutable segment @@ -77,6 +83,19 @@ public final class SegmentFactory { return generateMutableSegment(conf, comparator, memStoreLAB); } + // create new flat immutable segment from merging old immutable segments + public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf, + final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, + ImmutableSegment.Type segmentType, List<ImmutableSegment> segments) + throws IOException { + Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED, + "wrong immutable segment type"); + MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments); + return + // the last parameter "true" means to merge the compaction pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true); + } //****** private methods to instantiate concrete store segments **********// private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator, @@ -96,4 +115,11 @@ public final class SegmentFactory { return memStoreLAB; } + private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) { + List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>(); + for (ImmutableSegment segment : segments) { + mslabs.add(segment.getMemStoreLAB()); + } + return new ImmutableMemStoreLAB(mslabs); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java index 2e8bead..01160bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.LinkedList; +import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -43,7 +44,7 @@ public class VersionedSegmentsList { this.version = version; } - public LinkedList<ImmutableSegment> getStoreSegments() { + public List<ImmutableSegment> getStoreSegments() { return storeSegments; } http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 211a6d8..84efb09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -508,6 +508,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { + + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + ((CompactingMemStore)memstore).initiateType(); + byte[] row = Bytes.toBytes("testrow"); byte[] fam = Bytes.toBytes("testfamily"); byte[] qf1 = Bytes.toBytes("testqualifier1"); @@ -585,6 +590,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction1Bucket() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + ((CompactingMemStore)memstore).initiateType(); + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 // test 1 bucket @@ -609,6 +618,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction2Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + ((CompactingMemStore)memstore).initiateType(); String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; @@ -647,6 +659,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction3Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + ((CompactingMemStore)memstore).initiateType(); String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; String[] keys3 = { "D", "B", "B" }; http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index f89a040..6499251 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.util.Threads; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -61,7 +62,8 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore compactingSetUp(); Configuration conf = HBaseConfiguration.create(); - conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to CellArrayMap + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); this.memstore = new CompactingMemStore(conf, CellComparator.COMPARATOR, store, @@ -215,18 +217,17 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } ////////////////////////////////////////////////////////////////////////////// - // Flattening tests + // Merging tests ////////////////////////////////////////////////////////////////////////////// @Test - public void testFlattening() throws IOException { + public void testMerging() throws IOException { String[] keys1 = { "A", "A", "B", "C", "F", "H"}; String[] keys2 = { "A", "B", "D", "G", "I", "J"}; String[] keys3 = { "D", "B", "B", "E" }; - // set flattening to true - memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", true); - + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "index-compaction"); + ((CompactingMemStore)memstore).initiateType(); addRowsByKeys(memstore, keys1); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact @@ -238,13 +239,31 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore addRowsByKeys(memstore, keys2); // also should only flatten + int counter2 = 0; + for ( Segment s : memstore.getSegments()) { + counter2 += s.getCellsCount(); + } + assertEquals(12, counter2); + ((CompactingMemStore) memstore).disableCompaction(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening assertEquals(0, memstore.getSnapshot().getCellsCount()); + int counter3 = 0; + for ( Segment s : memstore.getSegments()) { + counter3 += s.getCellsCount(); + } + assertEquals(12, counter3); + addRowsByKeys(memstore, keys3); + int counter4 = 0; + for ( Segment s : memstore.getSegments()) { + counter4 += s.getCellsCount(); + } + assertEquals(16, counter4); + ((CompactingMemStore) memstore).enableCompaction(); @@ -258,7 +277,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore for ( Segment s : memstore.getSegments()) { counter += s.getCellsCount(); } - assertEquals(10,counter); + assertEquals(16,counter); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot ImmutableSegment s = memstore.getSnapshot(); @@ -295,7 +314,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore Threads.sleep(10); } // Just doing the cnt operation here - MemStoreCompactorIterator itr = new MemStoreCompactorIterator( + MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore()); int cnt = 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 1ea5112..fdd6b2c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -172,7 +172,7 @@ public class TestMemStoreLAB { public void testLABChunkQueue() throws Exception { HeapMemStoreLAB mslab = new HeapMemStoreLAB(); // by default setting, there should be no chunk queue initialized - assertNull(mslab.getChunkQueue()); + assertNull(mslab.getPooledChunks()); // reset mslab with chunk pool Configuration conf = HBaseConfiguration.create(); conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1); @@ -209,7 +209,7 @@ public class TestMemStoreLAB { // close the mslab mslab.close(); // make sure all chunks reclaimed or removed from chunk queue - int queueLength = mslab.getChunkQueue().size(); + int queueLength = mslab.getPooledChunks().size(); assertTrue("All chunks in chunk queue should be reclaimed or removed" + " after mslab closed but actually: " + queueLength, queueLength == 0); } http://git-wip-us.apache.org/repos/asf/hbase/blob/988d1f9b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 6bfaa59..f0f8c39 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -130,7 +130,7 @@ public class TestPerColumnFamilyFlush { conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 100 * 1024); // Intialize the region - Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); + Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf); // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i));